<a href="https://colab.research.google.com/github/AjanEshwara/pySpark_data_handling/blob/main/Tutorial2_PySpark_DF_CN7030_(uel).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Unlock the Potential of PySpark DataFrames: Hands-on Tips and Personalizations**

`University of East London, Docklands Campus, 2024-25`

`module name`: **`Machine Learning on Big Data (CN7030) - MSc AI&DS`**

`Author`: **`Dr Amin Karami (PG Academic Lead in CDT School)`**

`E`: **`a.karami@uel.ac.uk`**

`W`: **`http://www.aminkarami.com/`**

---

**DataFrame (DF)**: Schema (named columns) + declarative language. A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database. It is very efficient for strucutred data.

data: https://drive.google.com/file/d/1HiP_TkWYClAmzhhOFzhXdOXTfZb9DoB_/view?usp=drive_link (641MB)

source: https://spark.apache.org/docs/latest/sql-programming-guide.html

source: https://spark.apache.org/docs/latest/api/python/reference/

# **Section 1: Initialize PySpark**

In [2]:
!pip3 install pyspark



# **System Check**

In [3]:
#system free space memory
!free -h

               total        used        free      shared  buff/cache   available
Mem:            12Gi       787Mi       7.0Gi       1.0Mi       4.9Gi        11Gi
Swap:             0B          0B          0B


In [4]:
#available free cores
!nproc

2


# **Linking with Spark**

In [5]:
# Linking with Spark
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .appName("Tutorial2_CN7030") \
                    .master("local[*]") \
                    .config("spark.executor.memory", "4g") \
                    .config("spark.driver.memory", "2g") \
                    .config("spark.executor.cores", "2") \
                    .config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
                    .getOrCreate()

spark

In [6]:
#Linkink with spark
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .appName("Tutorial_01") \
                    .master("local[*]") \
                    .config("spark.executor.memory", "4g") \
                    .config("spark.driver.memory", "2g") \
                    .config("spark.executor.cores", "2") \
                    .config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
                    .getOrCreate()

spark

# **Connect to the Google Drive**

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# **Section 2: Create PySpark DataFrame from CSV**

In [8]:
df = spark.read.format('csv').load("/content/drive/MyDrive/data.csv", header=True, inferSchema=True)

#show table
df.show(truncate=True)

#show schema
df.printSchema()

#some info
print(df.count())
print(len(df.columns))

+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|                  id|               age|salary|             score|sales|height|weight|gender|category|income|expenses|
+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|  0.8017532427858894| 79.71301351894658|     4| 37.10369234532471|   47|    62|     5|Female|       B|     8|      85|
|  0.6565552949992319| 24.22600602366628|     6| 79.75769828959885|    8|    30|    42|  Male|       A|    70|      85|
|  0.2515595782593636| 70.97364852287149|    17| 71.71375356646551|   82|     1|    72|  Male|       A|    66|      53|
|  0.2073428376111074| 45.09378549789149|    32|              NULL|   59|    12|    21|  Male|       B|    56|       2|
|  0.6392921379278927|30.357970527906065|    13|              NULL|   22|    69|    58|Female|       A|     0|      74|
|  0.8505582285081454|              NULL

# How many partitions?

In [9]:
df.rdd.getNumPartitions()

6

# Let's increase the number of partitions

In [10]:
df2 = df.repartition(8)
df2.rdd.getNumPartitions()

8

In [11]:
df3 = df.repartition('category')
df3.rdd.getNumPartitions()

2

In [12]:
df4 = df.repartition(6, 'category')
df4.rdd.getNumPartitions()

6

In [13]:
df5 = df.repartitionByRange(10, "income")
df5.rdd.getNumPartitions()

10

# Write the DF to disk in a partitioned manner

In [14]:
df5.write.option('header', True) \
          .partitionBy('gender') \
          .mode('overwrite') \
          .csv('df5_repartion_gender')

In [15]:
df4.write.option('header', True) \
          .partitionBy('gender') \
          .mode('overwrite') \
          .csv('df4_repartion_gender')

In [16]:
df5.repartition(3).write.option('header', True) \
                  .partitionBy('gender') \
                  .mode('overwrite') \
                  .csv('df5_repartion_gender_2')

In [17]:
df4.write.option('header', True) \
         .partitionBy("gender") \
         .mode("overwrite") \
         .csv("gender_df4")

# spark.conf.set("spark.sql.files.maxRecordsPerFile", "500000")

In [18]:
df5.coalesce(6).write.option('header', True) \
         .partitionBy("gender", "category") \
         .mode("overwrite") \
         .csv("gender_df5_twoPartitons")



# **Section 3: DataFrame Operations and Transformations**

In [19]:
df2.select('gender', 'income','age').show(5)

+------+------+-----------------+
|gender|income|              age|
+------+------+-----------------+
|  Male|    67| 72.5729444487239|
|Female|    22|79.65777311571274|
|  Male|    60|83.53864267645021|
|Female|    38|40.53360311225305|
|  Male|    80|72.95804039716336|
+------+------+-----------------+
only showing top 5 rows



In [20]:
df4.select('gender', 'salary','age').show(5)

+------+------+------------------+
|gender|salary|               age|
+------+------+------------------+
|  Male|     6| 24.22600602366628|
|  Male|    17| 70.97364852287149|
|Female|    13|30.357970527906065|
|Female|    37| 57.94047153077633|
|Female|     0|              NULL|
+------+------+------------------+
only showing top 5 rows



In [21]:
df4.filter((df4.age>30)&(df4.salary>5)).show(5)

+-------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|                 id|               age|salary|             score|sales|height|weight|gender|category|income|expenses|
+-------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
| 0.2515595782593636| 70.97364852287149|    17| 71.71375356646551|   82|     1|    72|  Male|       A|    66|      53|
| 0.6392921379278927|30.357970527906065|    13|              NULL|   22|    69|    58|Female|       A|     0|      74|
| 0.7555506990689408| 57.94047153077633|    37| 99.05156945373632|   18|    67|    94|Female|       A|    91|      89|
|0.07531261247891552| 78.00738662272417|    44|57.377461450339794|   32|    83|    68|  Male|       A|     3|      35|
| 0.6088087252200041| 46.04202629270567|    20| 82.62279087618317|   83|    39|    26|  Male|       A|    10|      58|
+-------------------+------------------+------+-

In [22]:
from pyspark.sql.functions import mean,round,sum
df4.groupBy('category').agg(
    round(mean('salary'),2).alias('mean_salary'),
    round(sum('salary'),2).alias('sum_salary')
).show(10)

+--------+-----------+----------+
|category|mean_salary|sum_salary|
+--------+-----------+----------+
|       A|      49.49| 247412615|
|       B|      49.51| 247582007|
+--------+-----------+----------+



In [23]:
df4.sort('height',"score", ascending=False).show(10)

+--------------------+------------------+------+-----------------+-----+------+------+------+--------+------+--------+
|                  id|               age|salary|            score|sales|height|weight|gender|category|income|expenses|
+--------------------+------------------+------+-----------------+-----+------+------+------+--------+------+--------+
| 0.16933902279650814|              NULL|    37|99.99842061516966|   63|    99|    78|Female|       B|    60|      36|
|  0.4414179206080756| 47.31491599580799|    17|99.99629934376922|   96|    99|    53|Female|       A|    13|      63|
|  0.6365465779061935| 75.97020871674081|    59|99.99605629873773|   56|    99|    82|  Male|       A|    36|      64|
|   0.137572774067943| 46.72427635576165|    16|99.99478454051689|   14|    99|    68|Female|       B|    20|      47|
|  0.7188334449042442|              NULL|    24|99.99452559017527|    5|    99|    66|Female|       B|    73|      92|
|  0.6186372186898206| 68.40377455302632|     9|

In [24]:
#renamed a column
renamed_df=df4.withColumnRenamed("income","annual_income") \
              .withColumnRenamed("score", "annual_score")
renamed_df.show(10)

+-------------------+------------------+------+------------------+-----+------+------+------+--------+-------------+--------+
|                 id|               age|salary|      annual_score|sales|height|weight|gender|category|annual_income|expenses|
+-------------------+------------------+------+------------------+-----+------+------+------+--------+-------------+--------+
| 0.6565552949992319| 24.22600602366628|     6| 79.75769828959885|    8|    30|    42|  Male|       A|           70|      85|
| 0.2515595782593636| 70.97364852287149|    17| 71.71375356646551|   82|     1|    72|  Male|       A|           66|      53|
| 0.6392921379278927|30.357970527906065|    13|              NULL|   22|    69|    58|Female|       A|            0|      74|
| 0.7555506990689408| 57.94047153077633|    37| 99.05156945373632|   18|    67|    94|Female|       A|           91|      89|
|0.34380469538701885|              NULL|     0| 68.12994102604317|   50|    66|    29|Female|       A|           39|  

In [25]:
# drop
reduced_df = df4.drop("id", "height")
reduced_df.show(10)

+------------------+------+------------------+-----+------+------+--------+------+--------+
|               age|salary|             score|sales|weight|gender|category|income|expenses|
+------------------+------+------------------+-----+------+------+--------+------+--------+
| 24.22600602366628|     6| 79.75769828959885|    8|    42|  Male|       A|    70|      85|
| 70.97364852287149|    17| 71.71375356646551|   82|    72|  Male|       A|    66|      53|
|30.357970527906065|    13|              NULL|   22|    58|Female|       A|     0|      74|
| 57.94047153077633|    37| 99.05156945373632|   18|    94|Female|       A|    91|      89|
|              NULL|     0| 68.12994102604317|   50|    29|Female|       A|    39|      10|
| 78.00738662272417|    44|57.377461450339794|   32|    68|  Male|       A|     3|      35|
| 5.739474360847097|    60|              NULL|   69|    59|Female|       A|    89|      94|
|29.232534249025754|    33| 72.17303870051973|   19|     5|Female|       A|     

# **Section 4: Working with Missing Data**

In [27]:
from pyspark.sql.functions import col,isnan,when,count

missing_values = df4.select([sum(col(c).isNull().cast('Int')).alias(c) for c in df4.columns])
missing_values.show()

+---+-------+------+-------+-----+------+------+------+--------+------+--------+
| id|    age|salary|  score|sales|height|weight|gender|category|income|expenses|
+---+-------+------+-------+-----+------+------+------+--------+------+--------+
|  0|4000499|     0|4001316|    0|     0|     0|     0|       0|     0|       0|
+---+-------+------+-------+-----+------+------+------+--------+------+--------+



In [28]:
df_copy = df4.select("*")
df_cleaned = df_copy.dropna()
df_cleaned.show()

+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|                  id|               age|salary|             score|sales|height|weight|gender|category|income|expenses|
+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|  0.6565552949992319| 24.22600602366628|     6| 79.75769828959885|    8|    30|    42|  Male|       A|    70|      85|
|  0.2515595782593636| 70.97364852287149|    17| 71.71375356646551|   82|     1|    72|  Male|       A|    66|      53|
|  0.7555506990689408| 57.94047153077633|    37| 99.05156945373632|   18|    67|    94|Female|       A|    91|      89|
| 0.07531261247891552| 78.00738662272417|    44|57.377461450339794|   32|    83|    68|  Male|       A|     3|      35|
|   0.635481528615935|29.232534249025754|    33| 72.17303870051973|   19|    42|     5|Female|       A|     1|      11|
|  0.6088087252200041| 46.04202629270567

In [30]:
from pyspark.sql.functions import col,isnan,when,count

missing_values = df_cleaned.select([sum(col(c).isNull().cast('Int')).alias(c) for c in df_cleaned.columns])
missing_values.show()

+---+---+------+-----+-----+------+------+------+--------+------+--------+
| id|age|salary|score|sales|height|weight|gender|category|income|expenses|
+---+---+------+-----+-----+------+------+------+--------+------+--------+
|  0|  0|     0|    0|    0|     0|     0|     0|       0|     0|       0|
+---+---+------+-----+-----+------+------+------+--------+------+--------+



In [31]:
df_cleaned.count()

3598970

In [32]:
#drop selected values
columns_to_check = ["age","salary"]
df_copy = df4.select("*")
df_cleaned = df_copy.dropna(subset=columns_to_check)
df_cleaned.count()

5999501

In [33]:
#filling missing values with constance
df_copy = df4.select("*")
df_filled = df_copy.fillna({'age':30,'score':50})
df_filled.show(30)

+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|                  id|               age|salary|             score|sales|height|weight|gender|category|income|expenses|
+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|  0.6565552949992319| 24.22600602366628|     6| 79.75769828959885|    8|    30|    42|  Male|       A|    70|      85|
|  0.2515595782593636| 70.97364852287149|    17| 71.71375356646551|   82|     1|    72|  Male|       A|    66|      53|
|  0.6392921379278927|30.357970527906065|    13|              50.0|   22|    69|    58|Female|       A|     0|      74|
|  0.7555506990689408| 57.94047153077633|    37| 99.05156945373632|   18|    67|    94|Female|       A|    91|      89|
| 0.34380469538701885|              30.0|     0| 68.12994102604317|   50|    66|    29|Female|       A|    39|      10|
| 0.07531261247891552| 78.00738662272417

In [34]:
from pyspark.ml.feature import Imputer

imputer  = Imputer(inputCols=["age","score"], outputCols=["age_imputed","score_imputed"],strategy="mean")
df_imputed = imputer.fit(df4).transform(df4)
df_imputed.show(40)

+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+------------------+------------------+
|                  id|               age|salary|             score|sales|height|weight|gender|category|income|expenses|       age_imputed|     score_imputed|
+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+------------------+------------------+
|  0.6565552949992319| 24.22600602366628|     6| 79.75769828959885|    8|    30|    42|  Male|       A|    70|      85| 24.22600602366628| 79.75769828959885|
|  0.2515595782593636| 70.97364852287149|    17| 71.71375356646551|   82|     1|    72|  Male|       A|    66|      53| 70.97364852287149| 71.71375356646551|
|  0.6392921379278927|30.357970527906065|    13|              NULL|   22|    69|    58|Female|       A|     0|      74|30.357970527906065| 49.99490561227385|
|  0.7555506990689408| 57.94047153077633|    37| 99.

In [37]:


from pyspark.sql.functions import format_number, col

df_imputed = df_imputed.withColumn("age_imputed",format_number(col("age_imputed"), 0))
df_imputed = df_imputed.withColumn("score_imputed",format_number(col("score_imputed"), 3))

df_imputed.show(30)



+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+-----------+-------------+
|                  id|               age|salary|             score|sales|height|weight|gender|category|income|expenses|age_imputed|score_imputed|
+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+-----------+-------------+
|  0.6565552949992319| 24.22600602366628|     6| 79.75769828959885|    8|    30|    42|  Male|       A|    70|      85|         24|       79.758|
|  0.2515595782593636| 70.97364852287149|    17| 71.71375356646551|   82|     1|    72|  Male|       A|    66|      53|         71|       71.714|
|  0.6392921379278927|30.357970527906065|    13|              NULL|   22|    69|    58|Female|       A|     0|      74|         30|       49.995|
|  0.7555506990689408| 57.94047153077633|    37| 99.05156945373632|   18|    67|    94|Female|       A|    91|      89|     

In [39]:


# advanced: replace missing values with a custom logic using UDF (user-defined functions)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def custom_fill(value):
  if value is None:
    return "unknown"
  else:
    return value


custom_fill_udf = udf(custom_fill, StringType())
df_filled = df4.withColumn("score_filled", custom_fill_udf(df4.score))

df_filled.show(5)



+-------------------+------------------+------+-----------------+-----+------+------+------+--------+------+--------+-----------------+
|                 id|               age|salary|            score|sales|height|weight|gender|category|income|expenses|     score_filled|
+-------------------+------------------+------+-----------------+-----+------+------+------+--------+------+--------+-----------------+
| 0.6565552949992319| 24.22600602366628|     6|79.75769828959885|    8|    30|    42|  Male|       A|    70|      85|79.75769828959885|
| 0.2515595782593636| 70.97364852287149|    17|71.71375356646551|   82|     1|    72|  Male|       A|    66|      53|71.71375356646551|
| 0.6392921379278927|30.357970527906065|    13|             NULL|   22|    69|    58|Female|       A|     0|      74|          unknown|
| 0.7555506990689408| 57.94047153077633|    37|99.05156945373632|   18|    67|    94|Female|       A|    91|      89|99.05156945373632|
|0.34380469538701885|              NULL|     0|6