In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as func
from pyspark.sql import Window

In [2]:
sc = SparkContext(appName="step1")
spark = SparkSession(sc)

In [3]:
input_path = "./raw_input/"

In [4]:
df = spark.read.csv(input_path+"240.csv", header=True, inferSchema=True).dropna()
df = df.withColumn("discount", func.round((df.Price - df.PriceChangeTo) / df.Price * 100, 0).cast('Int'))
df = df.withColumn('app_id', func.lit('240'))
df.show(100)

+-------------------+--------+-----+-------------+--------+-------------+--------------+--------------+------------+------+
|               date|  Owners|Price|PriceChangeTo|discount|total_reviews|total_positive|total_negative|review_score|app_id|
+-------------------+--------+-----+-------------+--------+-------------+--------------+--------------+------------+------+
|2015-06-11 00:00:00|13830000|19.99|         7.49|      63|        36287|         35177|          1110|           9|   240|
|2015-06-22 00:00:00|13893000| 7.49|        19.99|    -167|        36806|         35677|          1129|           9|   240|
|2015-11-25 00:00:00|14300000|19.99|         7.49|      63|        40673|         39321|          1352|           9|   240|
|2015-12-01 00:00:00|14314000| 7.49|        19.99|    -167|        40938|         39569|          1369|           9|   240|
|2015-12-22 00:00:00|14362000|19.99|         7.49|      63|        41547|         40150|          1397|           9|   240|
|2016-01

In [5]:
df = df.withColumn("index", func.row_number().over(Window.orderBy(func.monotonically_increasing_id())))
df.show()

+-------------------+--------+-----+-------------+--------+-------------+--------------+--------------+------------+------+-----+
|               date|  Owners|Price|PriceChangeTo|discount|total_reviews|total_positive|total_negative|review_score|app_id|index|
+-------------------+--------+-----+-------------+--------+-------------+--------------+--------------+------------+------+-----+
|2015-06-11 00:00:00|13830000|19.99|         7.49|      63|        36287|         35177|          1110|           9|   240|    1|
|2015-06-22 00:00:00|13893000| 7.49|        19.99|    -167|        36806|         35677|          1129|           9|   240|    2|
|2015-11-25 00:00:00|14300000|19.99|         7.49|      63|        40673|         39321|          1352|           9|   240|    3|
|2015-12-01 00:00:00|14314000| 7.49|        19.99|    -167|        40938|         39569|          1369|           9|   240|    4|
|2015-12-22 00:00:00|14362000|19.99|         7.49|      63|        41547|         40150|  

In [6]:
colums = ["date", "total_reviews", "total_positive", "total_negative", "review_score", "discount"]
for c in colums:
    df = df.withColumn("prev_"+c, func.lag(func.col(c), 1).over(Window.orderBy("index")))
for c in colums:
    df = df.withColumn("next_"+c, func.lead(func.col(c), 1).over(Window.orderBy("index")))
df.show(2)

+-------------------+--------+-----+-------------+--------+-------------+--------------+--------------+------------+------+-----+-------------------+------------------+-------------------+-------------------+-----------------+-------------+-------------------+------------------+-------------------+-------------------+-----------------+-------------+
|               date|  Owners|Price|PriceChangeTo|discount|total_reviews|total_positive|total_negative|review_score|app_id|index|          prev_date|prev_total_reviews|prev_total_positive|prev_total_negative|prev_review_score|prev_discount|          next_date|next_total_reviews|next_total_positive|next_total_negative|next_review_score|next_discount|
+-------------------+--------+-----+-------------+--------+-------------+--------------+--------------+------------+------+-----+-------------------+------------------+-------------------+-------------------+-----------------+-------------+-------------------+------------------+-----------------

In [7]:
res = df.filter(df.discount < 0).dropna()
res = res.withColumn("total_increase", res.total_reviews - res.prev_total_reviews)\
        .withColumn("positive_increase", res.total_positive - res.prev_total_positive)\
        .withColumn("negative_increase", res.total_negative - res.prev_total_negative)\
        .withColumn("days_increase", func.datediff(res.date, res.prev_date))\
        .withColumn("total_normal", res.next_total_reviews - res.total_reviews)\
        .withColumn("positive_normal", res.next_total_positive - res.total_positive)\
        .withColumn("negative_normal", res.next_total_negative - res.total_negative)\
        .withColumn("days_normal", func.datediff(res.next_date, res.date))\
        .withColumn("raw_price", res.PriceChangeTo)\
        .withColumn("sale_price", res.Price)\
        .withColumn("discount", res.prev_discount)#.\
        # select("date", "raw_price", "sale_price", "discount",\
        #         "total_increase", "positive_increase", "negative_increase", "days_increase",\
        #         "total_normal", "positive_normal", "negative_normal", "days_normal")

In [8]:
res = res.withColumn("total_increase_rate", res.total_increase / res.days_increase)\
        .withColumn("total_normal_rate", res.total_normal / res.days_normal)

In [9]:
res.select("date", "raw_price", "sale_price", "discount", "total_increase_rate", "total_normal_rate").show()

+-------------------+---------+----------+--------+-------------------+------------------+
|               date|raw_price|sale_price|discount|total_increase_rate| total_normal_rate|
+-------------------+---------+----------+--------+-------------------+------------------+
|2015-06-22 00:00:00|    19.99|      7.49|      63|  47.18181818181818| 24.78846153846154|
|2015-12-01 00:00:00|    19.99|      7.49|      63| 44.166666666666664|              29.0|
|2016-01-04 00:00:00|    19.99|      7.49|      63|  53.69230769230769|          33.21875|
|2016-02-12 00:00:00|    19.99|      4.99|      75| 56.857142857142854|22.768421052631577|
|2016-11-29 00:00:00|    19.99|      6.79|      66|              379.0|26.608695652173914|
|2017-01-02 00:00:00|    19.99|      6.79|      66|  55.09090909090909| 24.98830409356725|
|2017-07-05 00:00:00|    19.99|      6.79|      66|               70.0| 25.94674556213018|
|2018-01-04 00:00:00|    19.99|      6.79|      66|               55.5| 26.38095238095238|

In [10]:
res.columns

['date',
 'Owners',
 'Price',
 'PriceChangeTo',
 'discount',
 'total_reviews',
 'total_positive',
 'total_negative',
 'review_score',
 'app_id',
 'index',
 'prev_date',
 'prev_total_reviews',
 'prev_total_positive',
 'prev_total_negative',
 'prev_review_score',
 'prev_discount',
 'next_date',
 'next_total_reviews',
 'next_total_positive',
 'next_total_negative',
 'next_review_score',
 'next_discount',
 'total_increase',
 'positive_increase',
 'negative_increase',
 'days_increase',
 'total_normal',
 'positive_normal',
 'negative_normal',
 'days_normal',
 'raw_price',
 'sale_price',
 'total_increase_rate',
 'total_normal_rate']

In [11]:
res = res.drop('Price', 'PriceChangeTo')
res.columns

['date',
 'Owners',
 'discount',
 'total_reviews',
 'total_positive',
 'total_negative',
 'review_score',
 'app_id',
 'index',
 'prev_date',
 'prev_total_reviews',
 'prev_total_positive',
 'prev_total_negative',
 'prev_review_score',
 'prev_discount',
 'next_date',
 'next_total_reviews',
 'next_total_positive',
 'next_total_negative',
 'next_review_score',
 'next_discount',
 'total_increase',
 'positive_increase',
 'negative_increase',
 'days_increase',
 'total_normal',
 'positive_normal',
 'negative_normal',
 'days_normal',
 'raw_price',
 'sale_price',
 'total_increase_rate',
 'total_normal_rate']

In [12]:
res = res.withColumn("sale_price_scale", (res.sale_price/10).cast('Int'))
res.select("sale_price_scale").show(5)

+----------------+
|sale_price_scale|
+----------------+
|               0|
|               0|
|               0|
|               0|
|               0|
+----------------+
only showing top 5 rows



In [13]:
distinct_sale_price = res.dropDuplicates(['sale_price']).select("index", "sale_price").withColumn("historical_low", func.lit(1))
distinct_sale_price.show()

+-----+----------+--------------+
|index|sale_price|historical_low|
+-----+----------+--------------+
|    2|      7.49|             1|
|    8|      4.99|             1|
|   10|      6.79|             1|
|   23|      2.49|             1|
|   29|      0.99|             1|
|   35|      1.99|             1|
+-----+----------+--------------+



In [14]:
distinct_sale_price = distinct_sale_price.withColumn('prev_sale_price', \
    func.lag(func.col('sale_price'), 1).over(Window.orderBy("index")))
distinct_sale_price = distinct_sale_price.withColumn('diff', distinct_sale_price.sale_price - distinct_sale_price.prev_sale_price).fillna(-1)
distinct_sale_price.show()

+-----+----------+--------------+---------------+-------------------+
|index|sale_price|historical_low|prev_sale_price|               diff|
+-----+----------+--------------+---------------+-------------------+
|    2|      7.49|             1|           -1.0|               -1.0|
|    8|      4.99|             1|           7.49|               -2.5|
|   10|      6.79|             1|           4.99| 1.7999999999999998|
|   23|      2.49|             1|           6.79|               -4.3|
|   29|      0.99|             1|           2.49|-1.5000000000000002|
|   35|      1.99|             1|           0.99|                1.0|
+-----+----------+--------------+---------------+-------------------+



In [15]:
distinct_sale_price.where(distinct_sale_price.diff > 0).count()

2

In [16]:
distinct_sale_price = distinct_sale_price.withColumn('historical_low', func.when(distinct_sale_price.diff > 0, 0).otherwise(1))\
    .where(distinct_sale_price.historical_low == 1)
distinct_sale_price.show()

+-----+----------+--------------+---------------+-------------------+
|index|sale_price|historical_low|prev_sale_price|               diff|
+-----+----------+--------------+---------------+-------------------+
|    2|      7.49|             1|           -1.0|               -1.0|
|    8|      4.99|             1|           7.49|               -2.5|
|   10|      6.79|             0|           4.99| 1.7999999999999998|
|   23|      2.49|             1|           6.79|               -4.3|
|   29|      0.99|             1|           2.49|-1.5000000000000002|
|   35|      1.99|             0|           0.99|                1.0|
+-----+----------+--------------+---------------+-------------------+



In [17]:
distinct_sale_price = res.dropDuplicates(['sale_price']).select("index", "sale_price")

distinct_sale_price = distinct_sale_price.withColumn('prev_sale_price', \
    func.lag(func.col('sale_price'), 1).over(Window.orderBy("index")))

distinct_sale_price = distinct_sale_price.withColumn('diff', \
    distinct_sale_price.sale_price - distinct_sale_price.prev_sale_price).fillna(-1)

isRepeat = distinct_sale_price.where(distinct_sale_price.diff > 0).count() > 0

while isRepeat:
    distinct_sale_price = distinct_sale_price.withColumn('historical_low', \
        func.when(distinct_sale_price.diff > 0, 0).otherwise(1))
    distinct_sale_price = distinct_sale_price.where(distinct_sale_price.historical_low == 1)

    distinct_sale_price = distinct_sale_price.withColumn('prev_sale_price', \
        func.lag(func.col('sale_price'), 1).over(Window.orderBy("index")))
    
    distinct_sale_price = distinct_sale_price.withColumn('diff', \
        distinct_sale_price.sale_price - distinct_sale_price.prev_sale_price).fillna(-1)

    isRepeat = distinct_sale_price.where(distinct_sale_price.diff > 0).count() > 0

distinct_sale_price.show()

+-----+----------+---------------+-------------------+--------------+
|index|sale_price|prev_sale_price|               diff|historical_low|
+-----+----------+---------------+-------------------+--------------+
|    2|      7.49|           -1.0|               -1.0|             1|
|    8|      4.99|           7.49|               -2.5|             1|
|   23|      2.49|           4.99|               -2.5|             1|
|   29|      0.99|           2.49|-1.5000000000000002|             1|
+-----+----------+---------------+-------------------+--------------+



In [18]:
index_list = [row['index'] for row in distinct_sale_price.select('index').collect()]
index_list

[2, 8, 23, 29]

In [19]:
res = res.withColumn('historical_low', func.when(res.index.isin(index_list), 1).otherwise(0))
res.select('sale_price', 'historical_low').show()

+----------+--------------+
|sale_price|historical_low|
+----------+--------------+
|      7.49|             1|
|      7.49|             0|
|      7.49|             0|
|      4.99|             1|
|      6.79|             0|
|      6.79|             0|
|      6.79|             0|
|      6.79|             0|
|      6.79|             0|
|      6.79|             0|
|      2.49|             1|
|      2.49|             0|
|      2.49|             0|
|      0.99|             1|
|      0.99|             0|
|      0.99|             0|
|      1.99|             0|
|      1.99|             0|
|      1.99|             0|
|      1.99|             0|
+----------+--------------+
only showing top 20 rows



In [20]:
res.columns

['date',
 'Owners',
 'discount',
 'total_reviews',
 'total_positive',
 'total_negative',
 'review_score',
 'app_id',
 'index',
 'prev_date',
 'prev_total_reviews',
 'prev_total_positive',
 'prev_total_negative',
 'prev_review_score',
 'prev_discount',
 'next_date',
 'next_total_reviews',
 'next_total_positive',
 'next_total_negative',
 'next_review_score',
 'next_discount',
 'total_increase',
 'positive_increase',
 'negative_increase',
 'days_increase',
 'total_normal',
 'positive_normal',
 'negative_normal',
 'days_normal',
 'raw_price',
 'sale_price',
 'total_increase_rate',
 'total_normal_rate',
 'sale_price_scale',
 'historical_low']

In [23]:
tags_df = spark.read.csv("./tags_input/joint_category_genre.csv", header=True, inferSchema=True)
tags_df.show(5)

+------+--------------------+----------+
|    id|            category|     genre|
+------+--------------------+----------+
|578080|    1,49,36,15,41,42|1,25,37,29|
|   550|2,1,49,36,9,38,22...|         1|
|218620|2,1,9,38,22,28,29...|       1,3|
|  4000|2,1,49,36,47,9,38...|     23,28|
|   240|  1,27,22,23,8,15,16|         1|
+------+--------------------+----------+
only showing top 5 rows



In [24]:
tags_df = tags_df.withColumn('category', func.split(func.col('category'), ',')).withColumn('genre', func.split(func.col('genre'), ','))
tags_df.show(5)

+------+--------------------+---------------+
|    id|            category|          genre|
+------+--------------------+---------------+
|578080|[1, 49, 36, 15, 4...|[1, 25, 37, 29]|
|   550|[2, 1, 49, 36, 9,...|            [1]|
|218620|[2, 1, 9, 38, 22,...|         [1, 3]|
|  4000|[2, 1, 49, 36, 47...|       [23, 28]|
|   240|[1, 27, 22, 23, 8...|            [1]|
+------+--------------------+---------------+
only showing top 5 rows



In [25]:
tags_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- genre: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [26]:
joint_df = res.join(tags_df, res.app_id == tags_df.id, 'inner')
joint_df.show(5)

+-------------------+--------+--------+-------------+--------------+--------------+------------+------+-----+-------------------+------------------+-------------------+-------------------+-----------------+-------------+-------------------+------------------+-------------------+-------------------+-----------------+-------------+--------------+-----------------+-----------------+-------------+------------+---------------+---------------+-----------+---------+----------+-------------------+------------------+----------------+--------------+---+--------------------+-----+
|               date|  Owners|discount|total_reviews|total_positive|total_negative|review_score|app_id|index|          prev_date|prev_total_reviews|prev_total_positive|prev_total_negative|prev_review_score|prev_discount|          next_date|next_total_reviews|next_total_positive|next_total_negative|next_review_score|next_discount|total_increase|positive_increase|negative_increase|days_increase|total_normal|positive_norma

In [28]:
df1 = joint_df.select('app_id', 'index', func.explode('category').alias('id'))

In [30]:
df1.show(10)

+------+-----+---+
|app_id|index| id|
+------+-----+---+
|   240|    2|  1|
|   240|    2| 27|
|   240|    2| 22|
|   240|    2| 23|
|   240|    2|  8|
|   240|    2| 15|
|   240|    2| 16|
|   240|    4|  1|
|   240|    4| 27|
|   240|    4| 22|
+------+-----+---+
only showing top 10 rows



In [33]:
test = df1.groupby('index').pivot('id').agg(func.lit(1))
test.show(100)

+-----+---+---+---+---+---+---+---+
|index|  1| 15| 16| 22| 23| 27|  8|
+-----+---+---+---+---+---+---+---+
|   31|  1|  1|  1|  1|  1|  1|  1|
|   27|  1|  1|  1|  1|  1|  1|  1|
|   12|  1|  1|  1|  1|  1|  1|  1|
|   47|  1|  1|  1|  1|  1|  1|  1|
|    6|  1|  1|  1|  1|  1|  1|  1|
|   16|  1|  1|  1|  1|  1|  1|  1|
|   20|  1|  1|  1|  1|  1|  1|  1|
|   41|  1|  1|  1|  1|  1|  1|  1|
|   43|  1|  1|  1|  1|  1|  1|  1|
|   37|  1|  1|  1|  1|  1|  1|  1|
|   35|  1|  1|  1|  1|  1|  1|  1|
|    4|  1|  1|  1|  1|  1|  1|  1|
|    8|  1|  1|  1|  1|  1|  1|  1|
|   23|  1|  1|  1|  1|  1|  1|  1|
|   39|  1|  1|  1|  1|  1|  1|  1|
|   10|  1|  1|  1|  1|  1|  1|  1|
|   45|  1|  1|  1|  1|  1|  1|  1|
|   25|  1|  1|  1|  1|  1|  1|  1|
|   29|  1|  1|  1|  1|  1|  1|  1|
|   33|  1|  1|  1|  1|  1|  1|  1|
|   14|  1|  1|  1|  1|  1|  1|  1|
|    2|  1|  1|  1|  1|  1|  1|  1|
|   18|  1|  1|  1|  1|  1|  1|  1|
+-----+---+---+---+---+---+---+---+



In [34]:
sc.stop()