In [1]:
# Data Transformation Problem and solution


In [28]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd
from datetime import datetime as dt

In [29]:
pdf=pd.DataFrame({"id":["s1,d1","s1,d1","s1,d2","s1,d2","s2,d1","s2,d1","s2,d2","s2,d2"],"date":["2020-01-01","2020-01-03","2020-01-01","2020-01-03","2020-01-01","2020-01-02","2020-01-01","2020-01-02"],"impr":[30,100,70,100,90,150,210,250]})
pdf

Unnamed: 0,id,date,impr
0,"s1,d1",2020-01-01,30
1,"s1,d1",2020-01-03,100
2,"s1,d2",2020-01-01,70
3,"s1,d2",2020-01-03,100
4,"s2,d1",2020-01-01,90
5,"s2,d1",2020-01-02,150
6,"s2,d2",2020-01-01,210
7,"s2,d2",2020-01-02,250


In [30]:
date_list=list(pdf["date"].unique())
date_list.sort()
date_list

['2020-01-01', '2020-01-02', '2020-01-03']

In [31]:
# Create Spark Session
spark = SparkSession.builder.appName("asheesh").getOrCreate()


In [32]:
# Create Spark DataFrame
df=spark.createDataFrame(pdf)

In [33]:
df=df.withColumn("slot_id", F.udf(lambda x: x.split(",")[0], StringType())(df.id))
df.show()

+-----+----------+----+-------+
|   id|      date|impr|slot_id|
+-----+----------+----+-------+
|s1,d1|2020-01-01|  30|     s1|
|s1,d1|2020-01-03| 100|     s1|
|s1,d2|2020-01-01|  70|     s1|
|s1,d2|2020-01-03| 100|     s1|
|s2,d1|2020-01-01|  90|     s2|
|s2,d1|2020-01-02| 150|     s2|
|s2,d2|2020-01-01| 210|     s2|
|s2,d2|2020-01-02| 250|     s2|
+-----+----------+----+-------+



In [34]:
df=df.groupby("slot_id","date").sum("impr").withColumnRenamed("sum(impr)","impr")
df.show()

+-------+----------+----+
|slot_id|      date|impr|
+-------+----------+----+
|     s2|2020-01-01| 300|
|     s1|2020-01-01| 100|
|     s2|2020-01-02| 400|
|     s1|2020-01-03| 200|
+-------+----------+----+



In [35]:
df=df.groupby("slot_id").agg(F.collect_list("impr").alias('impr_list'),F.collect_list("date").alias('dates'))
df.show(20,truncate=False)

+-------+----------+------------------------+
|slot_id|impr_list |dates                   |
+-------+----------+------------------------+
|s2     |[300, 400]|[2020-01-01, 2020-01-02]|
|s1     |[100, 200]|[2020-01-01, 2020-01-03]|
+-------+----------+------------------------+



In [36]:
def get_date_wise_sorted_impr(dates,impr_list):
    imprs=[]
    i=0
    for date in date_list:
        if date in dates:
            imprs.append(impr_list[i])
            i=i+1
        else:
            imprs.append(0)
    return imprs

In [37]:
df=df.withColumn("impr_hist", F.udf(lambda dats,impr_lst: get_date_wise_sorted_impr(dats,impr_lst), StringType())(df.dates,df.impr_list))
df=df.select("slot_id","dates","impr_hist")
df.show(truncate=False)

+-------+------------------------+-------------+
|slot_id|dates                   |impr_hist    |
+-------+------------------------+-------------+
|s2     |[2020-01-01, 2020-01-02]|[300, 400, 0]|
|s1     |[2020-01-01, 2020-01-03]|[100, 0, 200]|
+-------+------------------------+-------------+



In [38]:
# Convert Date String List into Date list
df=df.withColumn("dates", F.udf(lambda dd: [dt.strptime(x, '%Y-%m-%d') for x in dd], ArrayType(DateType()))(df.dates))
df.show(truncate=False)

+-------+------------------------+-------------+
|slot_id|dates                   |impr_hist    |
+-------+------------------------+-------------+
|s2     |[2020-01-01, 2020-01-02]|[300, 400, 0]|
|s1     |[2020-01-01, 2020-01-03]|[100, 0, 200]|
+-------+------------------------+-------------+



In [39]:
df.printSchema()

root
 |-- slot_id: string (nullable = true)
 |-- dates: array (nullable = true)
 |    |-- element: date (containsNull = true)
 |-- impr_hist: string (nullable = true)



In [40]:
# Date to WeekDays
df=df.withColumn("weekday", F.udf(lambda dd: [x.strftime('%A') for x in dd], ArrayType(StringType()))(df.dates))
df.show(truncate=False)

+-------+------------------------+-------------+---------------------+
|slot_id|dates                   |impr_hist    |weekday              |
+-------+------------------------+-------------+---------------------+
|s2     |[2020-01-01, 2020-01-02]|[300, 400, 0]|[Wednesday, Thursday]|
|s1     |[2020-01-01, 2020-01-03]|[100, 0, 200]|[Wednesday, Friday]  |
+-------+------------------------+-------------+---------------------+



In [41]:
# Slot id weekdays wise 
df=df.select(df.slot_id,F.explode(df.weekday))
df.show(truncate=False)

+-------+---------+
|slot_id|col      |
+-------+---------+
|s2     |Wednesday|
|s2     |Thursday |
|s1     |Wednesday|
|s1     |Friday   |
+-------+---------+

