## PySpark on Google Colab

Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. 

Therefore, our first task is to download Java.


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Next, we will install Apache Spark 3.2.1 with Hadoop 3.2 from here: https://spark.apache.org/downloads.html

In [None]:
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

Note – At the time of writing this article, 3.2.1 was the latest version of Apache Spark. But Spark is developing quite rapidly. So, if there is a newer version of Spark when you are executing this code, then you just need to replace 3.2.1, wherever you see it, with the latest version.

Now, we just need to unzip that folder.

In [None]:
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

There is one last thing that we need to install and that is the findspark library. 
It will locate Spark on the system and import it as a regular library.

In [None]:
!pip install -q findspark

Now that we have installed all the necessary dependencies in Colab, it is time to set the environment path. This will enable us to run Pyspark in the Colab environment.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

Time for the real test!

We need to locate Spark in the system. For that, we import findspark and use the findspark.init() method.

In [None]:
import findspark
findspark.init()

In [None]:
findspark.find()

'/content/spark-3.2.1-bin-hadoop3.2'

Now, we can import SparkSession from pyspark.sql and create a SparkSession, which is the entry point to Spark.

You can give a name to the session using appName() and add some configurations with config() if you wish.

Extra material: https://sparkbyexamples.com/pyspark/pyspark-what-is-sparksession/

In [None]:
# SparkSession : punto de entrada a PySpark para trabajar con RDD / Dataframes

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [None]:
spark

Import data!

This dataset was obtained from : https://datahack.analyticsvidhya.com/contest/black-friday/#ProblemStatement

In [None]:
!gdown https://drive.google.com/uc?id=1LnlIF2s8deJAgBQ2-wSso_yXM6_yVU9d

Downloading...
From: https://drive.google.com/uc?id=1LnlIF2s8deJAgBQ2-wSso_yXM6_yVU9d
To: /content/train.csv
100% 25.5M/25.5M [00:00<00:00, 145MB/s] 


### Business Problem:

A retail company “ABC Private Limited” wants to understand the customer purchase behaviour (specifically, purchase amount) against various products of different categories. They have shared purchase summary of various customers for selected high volume products from last month.
The data set also contains customer demographics (age, gender, marital status, city_type, stay_in_current_city), product details (product_id and product category) and Total purchase_amount from last month.

Now, they want to build a model to predict the purchase amount of customer against various products which will help them to create personalized offer for customers against different products.

## Reading csv files

If you are curious, please find below how to read different files types with Spark:

https://www.analyticsvidhya.com/blog/2020/10/data-engineering-101-data-sources-apache-spark/?utm_source=blog&utm_medium=working-with-pyspark-on-google-colab-for-data-scientists

In [None]:
df = spark.read.csv("train.csv", header=True)

## Show data

In [None]:
df.show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|              null|              null|    1422|
|100

## Show column details

In [None]:
df.printSchema()

root
 |-- User_ID: string (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Product_Category_1: string (nullable = true)
 |-- Product_Category_2: string (nullable = true)
 |-- Product_Category_3: string (nullable = true)
 |-- Purchase: string (nullable = true)



## Official description
User_ID	-> User ID <br>
Product_ID	-> Product ID <br>
Gender	-> Sex of User <br>
Age	-> Age in bins <br>
Occupation	-> Occupation (Masked) <br>
City_Category	-> Category of the City (A,B,C) <br>
Stay_In_Current_City_Years	-> Number of years stay in current city <br>
Marital_Status	-> Marital Status <br>
Product_Category_1	-> Product Category (Masked) <br>
Product_Category_2	-> Product may belongs to other category also (Masked) <br>
Product_Category_3	-> Product may belongs to other category also (Masked) <br>
Purchase	-> Purchase Amount (Target Variable) <br>

### Number of rows and columns in DF

In [None]:
df.count(), len(df.columns)

(550068, 12)

## Display specific column

In [None]:
df.select("User_ID","Gender","Age","Occupation").show(5)

+-------+------+----+----------+
|User_ID|Gender| Age|Occupation|
+-------+------+----+----------+
|1000001|     F|0-17|        10|
|1000001|     F|0-17|        10|
|1000001|     F|0-17|        10|
|1000001|     F|0-17|        10|
|1000002|     M| 55+|        16|
+-------+------+----+----------+
only showing top 5 rows



### Describing the columns

In [None]:
df.describe().show()

+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            550068|    550068|550068|550068|           550068|       550068|                    550068|             550068|            550068|            376430|            166821|           550068|
|   mean|1003028.8424013031|      null|  null|  null|8.076706879876669|         null|         1.468494139793958|0.40965298835780306| 5.404270017525106| 9.84232925112238

### Distinct values for Categorical columns

In [None]:
# Distinct value of one column
df.select("City_Category").distinct().show()

+-------------+
|City_Category|
+-------------+
|            B|
|            C|
|            A|
+-------------+



In [None]:
# Distinct value of two columns
df.select("City_Category", "Stay_In_Current_City_Years").distinct().show(10)

+-------------+--------------------------+
|City_Category|Stay_In_Current_City_Years|
+-------------+--------------------------+
|            A|                        4+|
|            B|                         2|
|            C|                         2|
|            A|                         3|
|            B|                         3|
|            C|                         1|
|            A|                         1|
|            C|                        4+|
|            C|                         0|
|            A|                         0|
+-------------+--------------------------+
only showing top 10 rows



In [None]:
# Distinct value of two columns + Order values
df.select("City_Category", "Stay_In_Current_City_Years").distinct().orderBy("City_Category", "Stay_In_Current_City_Years").show(10)

+-------------+--------------------------+
|City_Category|Stay_In_Current_City_Years|
+-------------+--------------------------+
|            A|                         0|
|            A|                         1|
|            A|                         2|
|            A|                         3|
|            A|                        4+|
|            B|                         0|
|            B|                         1|
|            B|                         2|
|            B|                         3|
|            B|                        4+|
+-------------+--------------------------+
only showing top 10 rows



## Group by sentence

In [None]:
# Group by one column
df.groupBy("City_Category").count().show()

+-------------+------+
|City_Category| count|
+-------------+------+
|            B|231173|
|            C|171175|
|            A|147720|
+-------------+------+



In [None]:
# Group by two columns
df.groupBy("City_Category", "Stay_In_Current_City_Years").count().show(10)

+-------------+--------------------------+-----+
|City_Category|Stay_In_Current_City_Years|count|
+-------------+--------------------------+-----+
|            A|                        4+|22319|
|            B|                         2|41772|
|            C|                         2|32952|
|            A|                         3|24804|
|            B|                         3|42691|
|            C|                         1|61103|
|            A|                         1|49305|
|            C|                        4+|27797|
|            C|                         0|21533|
|            A|                         0|24178|
+-------------+--------------------------+-----+
only showing top 10 rows



In [None]:
# Count number of rows per each combination of "City Category" and "Stay in current city years"
df.groupBy("City_Category", "Stay_In_Current_City_Years").count().orderBy("City_Category", "Stay_In_Current_City_Years").show(10)

+-------------+--------------------------+-----+
|City_Category|Stay_In_Current_City_Years|count|
+-------------+--------------------------+-----+
|            A|                         0|24178|
|            A|                         1|49305|
|            A|                         2|27114|
|            A|                         3|24804|
|            A|                        4+|22319|
|            B|                         0|28687|
|            B|                         1|83413|
|            B|                         2|41772|
|            B|                         3|42691|
|            B|                        4+|34610|
+-------------+--------------------------+-----+
only showing top 10 rows



In [None]:
# Count number of rows per each combination of "City Category" and "Stay in current city years" ordered by frequency
df.groupBy("City_Category", "Stay_In_Current_City_Years").count().orderBy("count").show(5)

+-------------+--------------------------+-----+
|City_Category|Stay_In_Current_City_Years|count|
+-------------+--------------------------+-----+
|            C|                         0|21533|
|            A|                        4+|22319|
|            A|                         0|24178|
|            A|                         3|24804|
|            A|                         2|27114|
+-------------+--------------------------+-----+
only showing top 5 rows



In [None]:
# Count number of rows per each combination of "City Category" and "Stay in current city years" ordered by frequency
df.groupBy("City_Category", "Stay_In_Current_City_Years").count().orderBy("count", ascending = False).show(5)

+-------------+--------------------------+-----+
|City_Category|Stay_In_Current_City_Years|count|
+-------------+--------------------------+-----+
|            B|                         1|83413|
|            C|                         1|61103|
|            A|                         1|49305|
|            B|                         3|42691|
|            B|                         2|41772|
+-------------+--------------------------+-----+
only showing top 5 rows



## Filtering data

A lot of options to filtering = https://sparkbyexamples.com/pyspark/pyspark-where-filter/


In [None]:
import pyspark.sql.functions as f

In [None]:
# Filtering by City Category "A"
df.filter(df.City_Category == "A").show(5)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|                12|              null|              null|    1422

In [None]:
# Also filtering by City Category "A"
df.filter(f.col("City_Category") == "A").show(5)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|                12|              null|              null|    1422

In [None]:
# Filtering by two conditions
df.filter((f.col("City_Category") == "A") & (f.col("Gender") == "M")).show(5)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000003| P00193542|     M|26-35|        15|            A|                         3|             0|                 1|                 2|              null|   15227|
|1000005| P00274942|     M|26-35|        20|            A|                         1|             1|                 8|              null|              null|    7871|
|1000005| P00251242|     M|26-35|        20|            A|                         1|             1|                 5|                11|              null|    5254

In [None]:
# Filtering by two conditions, select just a few columns and ordering result
df.select('User_ID', 'Product_ID')\
  .filter((f.col("City_Category") == "A") & (f.col("Gender") == "M"))\
  .orderBy("Product_ID").show(10)

+-------+----------+
|User_ID|Product_ID|
+-------+----------+
|1003618| P00000142|
|1001501| P00000142|
|1003650| P00000142|
|1002459| P00000142|
|1003679| P00000142|
|1002994| P00000142|
|1003066| P00000142|
|1005684| P00000142|
|1003892| P00000142|
|1002658| P00000142|
+-------+----------+
only showing top 10 rows



## Updating column type

In [None]:
df = df.withColumn("Purchase", df.Purchase.cast('double'))
df.printSchema()

root
 |-- User_ID: string (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Product_Category_1: string (nullable = true)
 |-- Product_Category_2: string (nullable = true)
 |-- Product_Category_3: string (nullable = true)
 |-- Purchase: double (nullable = true)



## Aggregative functions (sum, avg, min, max...)

In [None]:
# Sum purchase per user
df.groupBy("User_ID").agg(f.sum("Purchase")).show(10)

+-------+-------------+
|User_ID|sum(Purchase)|
+-------+-------------+
|1000240|     854532.0|
|1000280|     189891.0|
|1000795|     559717.0|
|1000839|    4681205.0|
|1000888|     492236.0|
|1001866|     860896.0|
|1002011|    1370857.0|
|1002185|    1540819.0|
|1002442|     100036.0|
|1002783|     335592.0|
+-------+-------------+
only showing top 10 rows



In [None]:
# Sum purchase per user and get top 5
df.groupBy("User_ID").agg(f.sum("Purchase")).orderBy("sum(Purchase)", ascending = False).show(5)

+-------+-------------+
|User_ID|sum(Purchase)|
+-------+-------------+
|1004277|  1.0536909E7|
|1001680|    8699596.0|
|1002909|    7577756.0|
|1001941|    6817493.0|
|1000424|    6573609.0|
+-------+-------------+
only showing top 5 rows



In [None]:
# Same result but renaming column name
df.groupBy("User_ID").agg(f.sum("Purchase").alias("Total Purchase")).orderBy("Total Purchase", ascending = False).show(5)

+-------+--------------+
|User_ID|Total Purchase|
+-------+--------------+
|1004277|   1.0536909E7|
|1001680|     8699596.0|
|1002909|     7577756.0|
|1001941|     6817493.0|
|1000424|     6573609.0|
+-------+--------------+
only showing top 5 rows



In [None]:
# Purchase avg per user and get top 5
df.groupBy("User_ID").agg(f.mean("Purchase")).show(5)

+-------+------------------+
|User_ID|     avg(Purchase)|
+-------+------------------+
|1000240| 9936.418604651162|
|1000280|            7303.5|
|1000795|         6996.4625|
|1000839|  10761.3908045977|
|1000888|10473.106382978724|
+-------+------------------+
only showing top 5 rows



In [None]:
# Can you round the result and rename the purchase column?
df.groupBy("User_ID").agg(f.round(f.mean("Purchase"), 2)\
                          .alias("Rounded_Purchase"))\
                          .orderBy("Rounded_Purchase", ascending = False).show(5)

+-------+----------------+
|User_ID|Rounded_Purchase|
+-------+----------------+
|1003902|        18577.89|
|1005069|        18490.17|
|1005999|        18345.94|
|1001349|        18162.74|
|1003461|         17508.7|
+-------+----------------+
only showing top 5 rows



In [None]:
# Create a new DF -> Calculate frecuency, min, max, total and avg purchase per user

purchase_data = df.groupBy("User_ID").agg(f.count('User_ID').alias("Frequency"), 
                                          f.mean('Purchase').alias("Avg_Purchase"), 
                                          f.sum('Purchase').alias("Total_Purchase"), 
                                          f.min('Purchase').alias("Min_Purchase"), 
                                          f.max('Purchase').alias("Max_Purchase"))

purchase_data.show(10)

+-------+---------+------------------+--------------+------------+------------+
|User_ID|Frequency|      Avg_Purchase|Total_Purchase|Min_Purchase|Max_Purchase|
+-------+---------+------------------+--------------+------------+------------+
|1000240|       86| 9936.418604651162|      854532.0|       247.0|     19647.0|
|1000280|       26|            7303.5|      189891.0|       773.0|     19352.0|
|1000795|       80|         6996.4625|      559717.0|      1720.0|     20646.0|
|1000839|      435|  10761.3908045977|     4681205.0|       578.0|     23842.0|
|1000888|       47|10473.106382978724|      492236.0|        24.0|     19396.0|
|1001866|       90| 9565.511111111111|      860896.0|       932.0|     23589.0|
|1002011|      141| 9722.390070921985|     1370857.0|       474.0|     20108.0|
|1002185|      171| 9010.637426900585|     1540819.0|        25.0|     23277.0|
|1002442|       16|           6252.25|      100036.0|       377.0|     16475.0|
|1002783|       28| 11985.42857142857|  

## Rename, create and drop columns

In [None]:
# Renaming column
purchase_data.withColumnRenamed("User_ID", "User_Number_ID").show(5)

+--------------+---------+------------------+--------------+------------+------------+
|User_Number_ID|Frequency|      Avg_Purchase|Total_Purchase|Min_Purchase|Max_Purchase|
+--------------+---------+------------------+--------------+------------+------------+
|       1000240|       86| 9936.418604651162|      854532.0|       247.0|     19647.0|
|       1000280|       26|            7303.5|      189891.0|       773.0|     19352.0|
|       1000795|       80|         6996.4625|      559717.0|      1720.0|     20646.0|
|       1000839|      435|  10761.3908045977|     4681205.0|       578.0|     23842.0|
|       1000888|       47|10473.106382978724|      492236.0|        24.0|     19396.0|
+--------------+---------+------------------+--------------+------------+------------+
only showing top 5 rows



In [None]:
# Renaming many column
pr_df = purchase_data.withColumnRenamed("User_ID", "User_Number_ID") \
                     .withColumnRenamed("Frequency", "Freq") \
                     .withColumnRenamed("Avg_Purchase", "Avg")

pr_df.show(5)

+--------------+----+------------------+--------------+------------+------------+
|User_Number_ID|Freq|               Avg|Total_Purchase|Min_Purchase|Max_Purchase|
+--------------+----+------------------+--------------+------------+------------+
|       1000240|  86| 9936.418604651162|      854532.0|       247.0|     19647.0|
|       1000280|  26|            7303.5|      189891.0|       773.0|     19352.0|
|       1000795|  80|         6996.4625|      559717.0|      1720.0|     20646.0|
|       1000839| 435|  10761.3908045977|     4681205.0|       578.0|     23842.0|
|       1000888|  47|10473.106382978724|      492236.0|        24.0|     19396.0|
+--------------+----+------------------+--------------+------------+------------+
only showing top 5 rows



In [None]:
# Creating new columns
pr_df.withColumn("Range", f.col("Max_Purchase") - f.col("Min_Purchase")).show(5)


+--------------+----+------------------+--------------+------------+------------+-------+
|User_Number_ID|Freq|               Avg|Total_Purchase|Min_Purchase|Max_Purchase|  Range|
+--------------+----+------------------+--------------+------------+------------+-------+
|       1000240|  86| 9936.418604651162|      854532.0|       247.0|     19647.0|19400.0|
|       1000280|  26|            7303.5|      189891.0|       773.0|     19352.0|18579.0|
|       1000795|  80|         6996.4625|      559717.0|      1720.0|     20646.0|18926.0|
|       1000839| 435|  10761.3908045977|     4681205.0|       578.0|     23842.0|23264.0|
|       1000888|  47|10473.106382978724|      492236.0|        24.0|     19396.0|19372.0|
+--------------+----+------------------+--------------+------------+------------+-------+
only showing top 5 rows



In [None]:
# Creating new columns
pr_df.withColumn("Label", f.when(f.col("Freq") < 100, "Less than 100") \
                           .when(f.col("Freq") < 300, "Less than 300") 
                           .otherwise("More than 300")).show(10)


+--------------+----+------------------+--------------+------------+------------+-------------+
|User_Number_ID|Freq|               Avg|Total_Purchase|Min_Purchase|Max_Purchase|        Label|
+--------------+----+------------------+--------------+------------+------------+-------------+
|       1000240|  86| 9936.418604651162|      854532.0|       247.0|     19647.0|Less than 100|
|       1000280|  26|            7303.5|      189891.0|       773.0|     19352.0|Less than 100|
|       1000795|  80|         6996.4625|      559717.0|      1720.0|     20646.0|Less than 100|
|       1000839| 435|  10761.3908045977|     4681205.0|       578.0|     23842.0|More than 300|
|       1000888|  47|10473.106382978724|      492236.0|        24.0|     19396.0|Less than 100|
|       1001866|  90| 9565.511111111111|      860896.0|       932.0|     23589.0|Less than 100|
|       1002011| 141| 9722.390070921985|     1370857.0|       474.0|     20108.0|Less than 300|
|       1002185| 171| 9010.637426900585|

In [None]:
# Drop one column

new_df = df.drop("Stay_In_Current_City_Years")
new_df.show(5)

+-------+----------+------+----+----------+-------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|             0|                 3|              null|              null|  8370.0|
|1000001| P00248942|     F|0-17|        10|            A|             0|                 1|                 6|                14| 15200.0|
|1000001| P00087842|     F|0-17|        10|            A|             0|                12|              null|              null|  1422.0|
|1000001| P00085442|     F|0-17|        10|            A|             0|                12|                14|              null|  1057.0|
|1000002| P00285442|     M|

In [None]:
# Drop N columns

columns_to_drop = ["Stay_In_Current_City_Years","Gender", "Product_Category_1", "Product_Category_2"]
new_df = df.drop(*columns_to_drop)

new_df.show(5)

+-------+----------+----+----------+-------------+--------------+------------------+--------+
|User_ID|Product_ID| Age|Occupation|City_Category|Marital_Status|Product_Category_3|Purchase|
+-------+----------+----+----------+-------------+--------------+------------------+--------+
|1000001| P00069042|0-17|        10|            A|             0|              null|  8370.0|
|1000001| P00248942|0-17|        10|            A|             0|                14| 15200.0|
|1000001| P00087842|0-17|        10|            A|             0|              null|  1422.0|
|1000001| P00085442|0-17|        10|            A|             0|              null|  1057.0|
|1000002| P00285442| 55+|        16|            C|             0|              null|  7969.0|
+-------+----------+----+----------+-------------+--------------+------------------+--------+
only showing top 5 rows

