Install and Setup Spark

In [122]:
# 1) First: install Java, Spark and and run a local Spark session by just running this on Google Colab:
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null   # !apt-get --> install java
#!wget -q https://dlcdn.apache.org/spark/spark-3.3.4/spark-3.3.4-bin-hadoop3.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz  # !tar --> like unzip
!pip install -q findspark  # !pip  --> instal a package, we cant import a library without installing it first, most libraries that we used were already installed

# 2) Set the locations where Spark and Java are installed to let know Colab where to find it.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"


# 3) Third: import spark libraries and use them
import findspark
findspark.init("spark-3.5.1-bin-hadoop3")
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
from pyspark.sql import Row
from pyspark.sql import functions

[33m0% [Working][0m            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m0% [Connecting to security.ubuntu.com (185.125.190.36)] [Connected to cloud.r-project.org (108.157.1[0m                                                                                                    Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
[33m0% [2 InRelease 12.7 kB/128 kB 10%] [Waiting for headers] [Waiting for headers] [Connecting to ppa.l[0m                                                                                                    Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
[33m0% [2 InRelease 15.6 kB/128 kB 12%] [Waiting for headers] [Waiting for headers] [Connecting to ppa.l[0m                                                                                                    Hit:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
[33m0% [2 InRelease 15.6 kB/128 

In [123]:
spark = SparkSession.builder.appName("Basic").getOrCreate()

Mount Google Drive

In [124]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Import churn_and_rating file

In [125]:
path = "/content/drive/MyDrive/Spark/Part C - Spark Project/churn_location_and_rating.csv"
!ls "/content/drive/MyDrive/Spark/Part C - Spark Project/churn_location_and_rating.csv"

'/content/drive/MyDrive/Spark/Part C - Spark Project/churn_location_and_rating.csv'


Import Necessary Libraries

In [126]:
!pip install pycountry



In [127]:
from pyspark.sql.types import (StructField, IntegerType,StringType, StructType)
from pyspark.sql.functions import when
from pyspark.sql.functions import countDistinct, avg
from pyspark.sql.functions import format_number
from pyspark.sql.functions import col, when, split, trim ,regexp_replace
import pycountry
import re
from pyspark.sql.functions import udf

Creating the Data Source

In [128]:
df = spark.read.csv("/content/drive/MyDrive/Spark/Part C - Spark Project/churn_location_and_rating.csv", header=True, inferSchema=True)

In [129]:
df.show()

+--------------------------+
|customerID;Location;Rating|
+--------------------------+
|      "id7044;nyc; new ...|
|      "id7045;stockton;...|
|      "id7046;moscow; y...|
|      "id7047;porto; v....|
|      "id7048;farnborou...|
|      "id7049;santa mon...|
|      "id7050;washingto...|
|      "id7051;timmins; ...|
|      "id7052;germantow...|
|      "id7053;albacete;...|
|      "id7054;melbourne...|
|      "id7055;fort brag...|
|      "id7056;barcelona...|
|      "id7057;mediapoli...|
|      "id7058;calgary; ...|
|      "id7059;albuquerq...|
|      "id7060;chesapeak...|
|      "id7061;rio de ja...|
|      "id7062;weston; ;...|
|      "id7063;langhorne...|
+--------------------------+
only showing top 20 rows



In [130]:
df.columns #one big column that needs to be split

['customerID;Location;Rating']

Cleaning and preping the data. Becareful of the n/a and blanks not reading as nulls

In [131]:
# Rename the single column to a temporary name
df = df.toDF("combined")

# Clean the combined column to remove any extraneous double quotes
df = df.withColumn("combined", regexp_replace(col("combined"), '"', ''))

# Split the single column into CustomerId, City, State, Rating, and Country
split_cols = split(df["combined"], ';')
df = df.withColumn("CustomerId", trim(split_cols.getItem(0))) \
       .withColumn("City", trim(split_cols.getItem(1))) \
       .withColumn("State", trim(split_cols.getItem(2))) \
       .withColumn("Rating", trim(split_cols.getItem(3))) \
       .withColumn("Country", trim(split_cols.getItem(4))) \
       .drop("combined")

# Handle null values and clean incorrect characters
df = df.withColumn("City", when((col("City").isNull()) | (col("City") == "") | (col("City") == "n/a") | (col("City").rlike(r'[^a-zA-Z\s]')), "Unknown").otherwise(col("City")))
df = df.withColumn("State", when((col("State").isNull()) | (col("State") == "") | (col("State") == "n/a") | (col("State").rlike(r'[^a-zA-Z\s]')), "Unknown").otherwise(col("State")))
df = df.withColumn("Country", when((col("Country").isNull()) | (col("Country") == "") | (col("Country") == "n/a") | (col("Country").rlike(r'[^a-zA-Z\s]')), "Unknown").otherwise(col("Country")))
df = df.withColumn("Rating", when((col("Rating").isNull()) | (col("Rating") == "") | (col("Rating") == "n/a"), -1).otherwise(col("Rating")))

#two rows with these non 1 to 10 values. Additionally these rows have no additional value (unknowns or random string of letters). Will delete
df = df.filter(~col("Rating").isin(["&#25289", "&#24029"]))

# Handle duplicate rows
df = df.dropDuplicates()

# Show the resulting DataFrame
df.show()

+----------+--------------+--------------+------+--------------+
|CustomerId|          City|         State|Rating|       Country|
+----------+--------------+--------------+------+--------------+
|    id7254|        lleida|        lleida|   1.0|         spain|
|    id7317|       bologna|emilia romagna|   5.0|         italy|
|    id7435|     barcelona|     catalunya|   6.0|         spain|
|    id7504|     singapore|       Unknown|   5.0|     singapore|
|    id7875|        mobile|       alabama|   8.0|united kingdom|
|    id7879|virginia beach|      virginia|   9.0|           usa|
|    id8184|      santiago|       Unknown|   3.0|         chile|
|    id8475|       roxbury| massachusetts|   2.0|           usa|
|    id8530|    saint paul|     minnesota|   5.0|           usa|
|    id8605|          reno|        nevada|   5.0|           usa|
|    id8649|   san antonio|         texas|   5.0|           usa|
|    id9055|          bray|       wicklow|   4.0|       Unknown|
|    id9395|       Unknow

Clean and Validate Country column

In [132]:
def getcountry(country):
    if not country:
        return "Unknown"

    # Remove incorrect characters and clean the country name
    country = re.sub(r'[^a-zA-Z\s]', '', country).strip().lower()

    # Try to match the country name with pycountry's list of countries
    for country_info in pycountry.countries:
        if country in [country_info.name.lower(), country_info.alpha_2.lower(), country_info.alpha_3.lower()]:
            return country_info.name

    # If no match is found, return the cleaned country name or "Unknown"
    return "Unknown"

# Register the UDF
getcountry_udf = udf(getcountry, StringType())

# Apply the UDF to the "Country" column
df = df.withColumn("Country", getcountry_udf(col("Country")))

# Clean the 'City' and 'State' columns without validation
df = df.withColumn("City", trim(col("City"))) \
       .withColumn("State", trim(col("State")))

# Show the resulting DataFrame
df.show()

+----------+--------------+--------------+------+--------------+
|CustomerId|          City|         State|Rating|       Country|
+----------+--------------+--------------+------+--------------+
|    id7254|        lleida|        lleida|   1.0|         Spain|
|    id7317|       bologna|emilia romagna|   5.0|         Italy|
|    id7435|     barcelona|     catalunya|   6.0|         Spain|
|    id7504|     singapore|       Unknown|   5.0|     Singapore|
|    id7875|        mobile|       alabama|   8.0|United Kingdom|
|    id7879|virginia beach|      virginia|   9.0| United States|
|    id8184|      santiago|       Unknown|   3.0|         Chile|
|    id8475|       roxbury| massachusetts|   2.0| United States|
|    id8530|    saint paul|     minnesota|   5.0| United States|
|    id8605|          reno|        nevada|   5.0| United States|
|    id8649|   san antonio|         texas|   5.0| United States|
|    id9055|          bray|       wicklow|   4.0|       Unknown|
|    id9395|       Unknow

Download the cleaned CSV

In [133]:
df.coalesce(1).write.csv("/content/drive/MyDrive/Spark/Part C - Spark Project/clean_churn_location_rating.csv", header=True)

Convert to Pandas dataframe for one clean CSV

In [134]:
import pandas as pd

# Read the part files back into a DataFrame
df = spark.read.csv("/content/drive/MyDrive/Spark/Part C - Spark Project/clean_churn_location_rating.csv", header=True, inferSchema=True)

# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = df.toPandas()

# Save the Pandas DataFrame as a single CSV file
pandas_df.to_csv("/content/drive/MyDrive/Spark/Part C - Spark Project/clean_churn_location_rating_single.csv", index=False)

Stop Spark

In [135]:
spark.stop()