<a href="https://colab.research.google.com/github/easycloudapi/learn_ml/blob/main/proof_of_concepts/2nd_ML_Project_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ML Project with PySpark

This is my 2nd ML Project. I am developing this using Pyspark.

Ref:
1. https://www.analyticsvidhya.com/blog/2022/09/machine-learning-pipeline-in-pyspark/
2. https://intellipaat.com/blog/tutorial/spark-tutorial/machine-learning-with-pyspark-tutorial/
3. https://www.datacamp.com/tutorial/pyspark-tutorial-getting-started-with-pyspark

# Install Python Packages:

In [1]:
! pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


# Step 1 - Initiate PySpark Session:

In [26]:
from pyspark.sql import SparkSession

# get active spark session
spark = SparkSession.getActiveSession()

# if there is existing active spark session, then stop it
if spark:
    spark.stop()

# create new spark session
spark = SparkSession.builder  \
                    .appName("ML Model With PySpark")  \
                    .master("local[4]")  \
                    .getOrCreate()

# display spark session
spark

# Step 2 - Load The Data Into Dataframe:

## Unzip the online zip file without downloading:

Ref:
1. online file read: https://copyprogramming.com/howto/pandas-how-to-read-excel-file-from-zip-archive

In [3]:
from urllib.request import urlopen
import zipfile
import io
import pandas as pd

data_source_zip_url = "https://archive.ics.uci.edu/static/public/352/online+retail.zip"
data_source_file_name = "Online Retail.xlsx"

zipped_file_bytes_object = urlopen(url=data_source_zip_url, timeout=15)  # returns bytes object
print(f"Successfully read, {data_source_zip_url}")

zipped_file_bytes_content = zipped_file_bytes_object.read()
unzipped_file = zipfile.ZipFile(io.BytesIO(zipped_file_bytes_content))
print(f"file has been unzipped succesfully")

Successfully read, https://archive.ics.uci.edu/static/public/352/online+retail.zip
file has been unzipped succesfully


## Load data into Pandas Dataframe:

In [4]:
pandas_df = pd.read_excel(unzipped_file.open(data_source_file_name))  # taking nearly 49s to read the file
print(f"dataframe has been loaded into pandas dataframe \n with file: {data_source_file_name}\n with size: {pandas_df.shape}")

dataframe has been loaded into pandas dataframe 
 with file: Online Retail.xlsx
 with size: (541909, 8)


In [5]:
pandas_df.head(5)

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850.0,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850.0,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom


## Load data into Spark Dataframe:

In [6]:
spark_df = spark.createDataFrame(pandas_df)  # taking nearly 24s to convert the pandas_df into spark_df

In [7]:
display(spark_df)

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: bigint, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

# Step 3 - Exploratory Data Analysis:

In [8]:
row_count = spark_df.count()
print(f"number of rows in the dataframe: {row_count}")

number of rows in the dataframe: 541909


In [9]:
unique_customers = spark_df.select("CustomerID").distinct().count()
print(f"unique customers are present in the dataframe: {unique_customers}")

unique customers are present in the dataframe: 4373


In [10]:
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import desc

print("Top 10 countries do most purchases, are:")

# spark_df.groupBy('Country').agg(countDistinct('CustomerID').alias('country_count')).show()

spark_df.groupBy('Country')  \
        .agg(countDistinct('CustomerID').alias('country_count'))  \
        .orderBy(desc('country_count'))  \
        .show(10)

Top 10 countries do most purchases, are:
+--------------+-------------+
|       Country|country_count|
+--------------+-------------+
|United Kingdom|         3951|
|       Germany|           95|
|        France|           88|
|         Spain|           31|
|       Belgium|           25|
|   Switzerland|           22|
|      Portugal|           20|
|         Italy|           15|
|       Finland|           12|
|       Austria|           11|
+--------------+-------------+
only showing top 10 rows



# Step 4 - Data Pre-processing:




