In [None]:
!wget https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
!tar -xvf spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
!pip install findspark

Collecting findspark
  Downloading findspark-1.4.2-py2.py3-none-any.whl (4.2 kB)
Installing collected packages: findspark
Successfully installed findspark-1.4.2


In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import Window
spark = SparkSession.builder.appName("ETL").getOrCreate()

Setting up proper schema

In [None]:
df = spark.read.options(header = "True").csv('/content/drive/MyDrive/AlgoEngines/RandomData.csv')
df.show(5)


+-------------------+-----------+----------+------------+--------+------------+------+----------+------+
|          date_time|client_code|mobile_num|amount_spent|currency|     product|gender|       dob|refund|
+-------------------+-----------+----------+------------+--------+------------+------+----------+------+
|2020-09-11 01:17:46|        101|9123458445|         956|     INR|Premier Card|     M|1950-07-25| FALSE|
|2021-01-28 18:51:40|        101|9123458445|         871|     USD| Amazon Card|     M|1950-07-25| FALSE|
|2020-09-04 11:35:11|        101|9123458445|         861|     EUR| Cattle Card|     M|1950-07-25| FALSE|
|2020-07-31 23:42:16|        101|9123458445|         788|     INR|Premier Card|     M|1950-07-25| FALSE|
|2021-09-17 08:06:07|        101|9123458445|         437|     USD| Amazon Card|     M|1950-07-25| FALSE|
+-------------------+-----------+----------+------------+--------+------------+------+----------+------+
only showing top 5 rows



In [None]:
df.printSchema()

root
 |-- date_time: string (nullable = true)
 |-- client_code: string (nullable = true)
 |-- mobile_num: string (nullable = true)
 |-- amount_spent: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- product: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- refund: string (nullable = true)



In [None]:
schema = StructType([
                     StructField("date_time", StringType(), False),
                     StructField("client_code", IntegerType(), True),
                     StructField("mobile_num", StringType(), False),
                     StructField("amount_spent", IntegerType(), False),
                     StructField("currency", StringType(), False),
                     StructField("product", StringType(), False),
                     StructField("gender", StringType(), False),
                     StructField("dob", StringType(), False),
                     StructField("refund", StringType(), False)
])

In [None]:
df = spark.read.options(header = "True").schema(schema).csv("/content/drive/MyDrive/AlgoEngines/RandomData.csv")
df.show(5)
df.printSchema()

+-------------------+-----------+----------+------------+--------+------------+------+----------+------+
|          date_time|client_code|mobile_num|amount_spent|currency|     product|gender|       dob|refund|
+-------------------+-----------+----------+------------+--------+------------+------+----------+------+
|2020-09-11 01:17:46|        101|9123458445|         956|     INR|Premier Card|     M|1950-07-25| FALSE|
|2021-01-28 18:51:40|        101|9123458445|         871|     USD| Amazon Card|     M|1950-07-25| FALSE|
|2020-09-04 11:35:11|        101|9123458445|         861|     EUR| Cattle Card|     M|1950-07-25| FALSE|
|2020-07-31 23:42:16|        101|9123458445|         788|     INR|Premier Card|     M|1950-07-25| FALSE|
|2021-09-17 08:06:07|        101|9123458445|         437|     USD| Amazon Card|     M|1950-07-25| FALSE|
+-------------------+-----------+----------+------------+--------+------------+------+----------+------+
only showing top 5 rows

root
 |-- date_time: string (n

Problem Statement 1:
Add 2 fields where you compute the agreegate of total spend by product, gender for each day (reset for next day)

In [None]:
# This step is necessary to get the transaction date, excluding the time bit, which is further used for computations on a per day basis
df = df.withColumn("txn_date", F.to_date("date_time"))                   
df.sort("txn_date").show()

+-------------------+-----------+----------+------------+--------+------------+------+----------+------+----------+
|          date_time|client_code|mobile_num|amount_spent|currency|     product|gender|       dob|refund|  txn_date|
+-------------------+-----------+----------+------------+--------+------------+------+----------+------+----------+
|2020-01-08 15:05:01|        111|9325489657|         705|     USD| Cattle Card|     M|1989-09-27|  TRUE|2020-01-08|
|2020-01-19 19:12:08|        101|9123458445|         726|     INR| Amazon Card|     M|1950-07-25| FALSE|2020-01-19|
|2020-01-23 13:11:27|        109|9222356845|         820|     EUR| Amazon Card|     M|1988-09-27| FALSE|2020-01-23|
|2020-02-09 15:07:13|        112|9334512789|         875|     USD| Cattle Card|     F|1986-09-27| FALSE|2020-02-09|
|2020-02-23 19:45:52|        103|9111452369|         741|     USD| Amazon Card|     M|1979-09-27| FALSE|2020-02-23|
|2020-02-25 08:51:37|        106|9111111160|         476|     USD| Cattl

In [None]:
windowSpec_product = Window.partitionBy("txn_date","product")
windowSpec_gender = Window.partitionBy("txn_date","gender")

df1 = df.withColumn("product_sum", F.sum(F.col("amount_spent")).over(windowSpec_product)).withColumn("gender_sum", F.sum(F.col("amount_spent")).over(windowSpec_gender))


df1.select("date_time", "client_code","amount_spent", "product", "gender", "txn_date", "product_sum", "gender_sum").sort("txn_date").show()

+-------------------+-----------+------------+------------+------+----------+-----------+----------+
|          date_time|client_code|amount_spent|     product|gender|  txn_date|product_sum|gender_sum|
+-------------------+-----------+------------+------------+------+----------+-----------+----------+
|2020-01-08 15:05:01|        111|         705| Cattle Card|     M|2020-01-08|        705|       705|
|2020-01-19 19:12:08|        101|         726| Amazon Card|     M|2020-01-19|        726|       726|
|2020-01-23 13:11:27|        109|         820| Amazon Card|     M|2020-01-23|        820|       820|
|2020-02-09 15:07:13|        112|         875| Cattle Card|     F|2020-02-09|        875|       875|
|2020-02-23 19:45:52|        103|         741| Amazon Card|     M|2020-02-23|        741|       741|
|2020-02-25 08:51:37|        106|         476| Cattle Card|     F|2020-02-25|        476|       476|
|2020-03-09 13:48:54|        109|         310|Premier Card|     M|2020-03-09|        310|  

In [None]:
df1.columns

['date_time',
 'client_code',
 'mobile_num',
 'amount_spent',
 'currency',
 'product',
 'gender',
 'dob',
 'refund',
 'txn_date',
 'product_sum',
 'gender_sum']

In [None]:
# This df is created in order to merge with the final output df.
df1_final = df1.select('date_time',
 'client_code',
 'mobile_num',
 'amount_spent',
 'currency',
 'product',
 'gender',
 'dob',
 'refund',
 'product_sum',
 'gender_sum')
df1_final.show(2)

+-------------------+-----------+----------+------------+--------+-----------+------+----------+------+-----------+----------+
|          date_time|client_code|mobile_num|amount_spent|currency|    product|gender|       dob|refund|product_sum|gender_sum|
+-------------------+-----------+----------+------------+--------+-----------+------+----------+------+-----------+----------+
|2020-05-12 00:40:47|        102|9112345778|         699|     EUR|Amazon Card|     F|2000-07-25| FALSE|       1149|      1149|
|2020-05-12 23:41:00|        110|9222245863|         450|     USD|Amazon Card|     F|1984-09-27| FALSE|       1149|      1149|
+-------------------+-----------+----------+------------+--------+-----------+------+----------+------+-----------+----------+
only showing top 2 rows



Problem Statement 2: Add 3 fields to the data frame where you have aggregate spend by client over 20/30/40 yrs old for the entire period

In [None]:

# This bit creates a column with the custumers age
df2 = df.withColumn("age",F.round(F.months_between(F.current_date(),F.col("dob"))/F.lit(12)))
df2.show(5)

df2 = df2.withColumn("age",df2.age.cast('int'))

df2.printSchema()

df2.show(5)


+-------------------+-----------+----------+------------+--------+------------+------+----------+------+----------+----+
|          date_time|client_code|mobile_num|amount_spent|currency|     product|gender|       dob|refund|  txn_date| age|
+-------------------+-----------+----------+------------+--------+------------+------+----------+------+----------+----+
|2020-09-11 01:17:46|        101|9123458445|         956|     INR|Premier Card|     M|1950-07-25| FALSE|2020-09-11|71.0|
|2021-01-28 18:51:40|        101|9123458445|         871|     USD| Amazon Card|     M|1950-07-25| FALSE|2021-01-28|71.0|
|2020-09-04 11:35:11|        101|9123458445|         861|     EUR| Cattle Card|     M|1950-07-25| FALSE|2020-09-04|71.0|
|2020-07-31 23:42:16|        101|9123458445|         788|     INR|Premier Card|     M|1950-07-25| FALSE|2020-07-31|71.0|
|2021-09-17 08:06:07|        101|9123458445|         437|     USD| Amazon Card|     M|1950-07-25| FALSE|2021-09-17|71.0|
+-------------------+-----------

In [None]:
# This bit creates a column specifying the age category of the consumer, with conditions as given in the code

from pyspark.sql.functions import when

df2 = df2.withColumn("age_class", \
   when((df2.age < 20), F.lit(10)) \
     .when((df2.age >= 20) & (df2.age <= 29), F.lit(20)) \
     .when((df2.age >= 30) & (df2.age <= 39), F.lit(30)) \
     .otherwise(F.lit(40)) \
  )

df2.printSchema()

root
 |-- date_time: string (nullable = true)
 |-- client_code: integer (nullable = true)
 |-- mobile_num: string (nullable = true)
 |-- amount_spent: integer (nullable = true)
 |-- currency: string (nullable = true)
 |-- product: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- refund: string (nullable = true)
 |-- txn_date: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- age_class: integer (nullable = false)



In [None]:
# This computes the spending per respective category in a new column called spendings
windowSpec_class = Window.partitionBy("age_class").orderBy("age").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df2 = df2.withColumn("spendings", F.sum(F.col("amount_spent")).over(windowSpec_class))  

# This separates the spendings in 3 different columns as needed in the problem statement
df2 = df2.withColumn("spending_20s", when(df2.age_class == 20, df2.spendings).otherwise(0))\
         .withColumn("spending_30s", when(df2.age_class == 30, df2.spendings).otherwise(0))\
         .withColumn("spending_40+", when(df2.age_class == 40, df2.spendings).otherwise(0))

df2.show(100)

+-------------------+-----------+----------+------------+--------+------------+------+----------+------+----------+---+---------+------------+---------+------------+------------+------------+
|          date_time|client_code|mobile_num|amount_spent|currency|     product|gender|       dob|refund|  txn_date|age|age_class|spendings_20|spendings|spending_20s|spending_30s|spending_40+|
+-------------------+-----------+----------+------------+--------+------------+------+----------+------+----------+---+---------+------------+---------+------------+------------+------------+
|2020-09-07 09:51:26|        102|9112345778|         708|     USD|Premier Card|     F|2000-07-25| FALSE|2020-09-07| 21|       20|       19806|    19806|       19806|           0|           0|
|2020-07-02 23:23:30|        102|9112345778|         528|     EUR| Amazon Card|     F|2000-07-25| FALSE|2020-07-02| 21|       20|       19806|    19806|       19806|           0|           0|
|2020-09-09 03:31:07|        102|9112345

In [None]:
# This is just to confirm if the computations above are correct. Can be ignored.
df2.groupBy("age_class").agg(F.sum("amount_spent")).show()

+---------+-----------------+
|age_class|sum(amount_spent)|
+---------+-----------------+
|       40|            15571|
|       20|            19806|
|       30|            22265|
+---------+-----------------+



In [None]:
df2 = df2.withColumnRenamed("d2_date", "date_time")
df2.columns

['date_time',
 'client_code',
 'mobile_num',
 'amount_spent',
 'currency',
 'product',
 'gender',
 'dob',
 'refund',
 'txn_date',
 'age',
 'age_class',
 'spendings_20',
 'spendings',
 'spending_20s',
 'spending_30s',
 'spending_40+']

In [None]:
# This df is created in order to merge with the final output df.
df2_final = df2.select('date_time','spending_20s',
 'spending_30s',
 'spending_40+')

df2_final.show(2)

+-------------------+------------+------------+------------+
|          date_time|spending_20s|spending_30s|spending_40+|
+-------------------+------------+------------+------------+
|2020-09-07 09:51:26|       19806|           0|           0|
|2020-07-02 23:23:30|       19806|           0|           0|
+-------------------+------------+------------+------------+
only showing top 2 rows



Problem Statement 3: 
Add 1 field to compute the running total of refund values for each client code 

In [None]:
# This creates a new column with the refund amount if applicable. Wherever refund is not applicable, its zero
df3 = df3.withColumn("refund_amount", when(df3.refund == "TRUE", df3.amount_spent).otherwise(0))
df3.show()

+-------------------+-----------+----------+------------+--------+------------+------+----------+------+----------+-------------+-------------+
|          date_time|client_code|mobile_num|amount_spent|currency|     product|gender|       dob|refund|  txn_date|running_total|refund_amount|
+-------------------+-----------+----------+------------+--------+------------+------+----------+------+----------+-------------+-------------+
|2020-05-17 18:16:06|        108|9223548512|         260|     INR| Amazon Card|     F|1998-09-27| FALSE|2020-05-17|          260|            0|
|2020-10-23 05:49:38|        108|9223548512|         687|     USD| Cattle Card|     F|1998-09-27| FALSE|2020-10-23|          947|            0|
|2021-06-11 19:31:06|        108|9223548512|         807|     EUR|Premier Card|     F|1998-09-27| FALSE|2021-06-11|         1754|            0|
|2020-01-19 19:12:08|        101|9123458445|         726|     INR| Amazon Card|     M|1950-07-25| FALSE|2020-01-19|          726|       

In [None]:
import sys
# Running total is computed here
# Technically, this should be done over same currency, since this is not part of the problem statement, it is ignored for the computations.
df3 = df3.withColumn('running_total', F.sum("refund_amount").over(Window.partitionBy("client_code").orderBy("date_time").rowsBetween(-sys.maxsize, 0)))
df3.sort("client_code","date_time").show(100)

+-------------------+-----------+----------+------------+--------+------------+------+----------+------+----------+-------------+-------------+
|          date_time|client_code|mobile_num|amount_spent|currency|     product|gender|       dob|refund|  txn_date|running_total|refund_amount|
+-------------------+-----------+----------+------------+--------+------------+------+----------+------+----------+-------------+-------------+
|2020-01-19 19:12:08|        101|9123458445|         726|     INR| Amazon Card|     M|1950-07-25| FALSE|2020-01-19|            0|            0|
|2020-04-18 13:54:45|        101|9123458445|         183|     EUR|Premier Card|     M|1950-07-25| FALSE|2020-04-18|            0|            0|
|2020-06-06 13:36:52|        101|9123458445|         685|     INR| Amazon Card|     M|1950-07-25| FALSE|2020-06-06|            0|            0|
|2020-06-08 15:28:55|        101|9123458445|         638|     USD| Cattle Card|     M|1950-07-25| FALSE|2020-06-08|            0|       

In [None]:
df3 = df3.withColumnRenamed("d3_date", "date_time")
df3.columns

['date_time',
 'client_code',
 'mobile_num',
 'amount_spent',
 'currency',
 'product',
 'gender',
 'dob',
 'refund',
 'txn_date',
 'running_total',
 'refund_amount']

In [None]:
# This df is created in order to merge with the final output df.
df3_final = df3.select('date_time', 'running_total')
df3_final.show(2)

+-------------------+-------------+
|          date_time|running_total|
+-------------------+-------------+
|2020-05-17 18:16:06|            0|
|2020-10-23 05:49:38|            0|
+-------------------+-------------+
only showing top 2 rows



Final df


In [None]:
# All the three dfs are joined based on the date_time column, which is common to all of them.
# Assumption made here is that every row in the date_time column is unique.
# If this is not the case, all the above operations will have to performed wrt to the combination of date_time, client_code, and product for uniqueness
df_final = df1_final.join(df2_final, ["date_time"]).join(df3_final, ["date_time"]) 
df_final.show()

+-------------------+-----------+----------+------------+--------+------------+------+----------+------+-----------+----------+------------+------------+------------+-------------+
|          date_time|client_code|mobile_num|amount_spent|currency|     product|gender|       dob|refund|product_sum|gender_sum|spending_20s|spending_30s|spending_40+|running_total|
+-------------------+-----------+----------+------------+--------+------------+------+----------+------+-----------+----------+------------+------------+------------+-------------+
|2020-05-12 00:40:47|        102|9112345778|         699|     EUR| Amazon Card|     F|2000-07-25| FALSE|       1149|      1149|       19806|           0|           0|            0|
|2020-05-12 23:41:00|        110|9222245863|         450|     USD| Amazon Card|     F|1984-09-27| FALSE|       1149|      1149|           0|       22265|           0|            0|
|2021-02-07 09:10:36|        112|9334512789|         931|     USD|Premier Card|     F|1986-09-2

Writing the df to destination

In [None]:
df_final.write.mode("overwrite").options(header='True').csv('/content/drive/MyDrive/AlgoEngines/output')