<a href="https://colab.research.google.com/github/dpalacioj/pyspark-getting-started/blob/main/PySpark_Tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Upload Drive and Spark System



In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%%time
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

CPU times: user 339 ms, sys: 41.9 ms, total: 381 ms
Wall time: 44.2 s


## Set Enviroment Variables



In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [4]:
!ls

drive  sample_data  spark-3.5.1-bin-hadoop3  spark-3.5.1-bin-hadoop3.tgz


In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate() # Always to create a Session
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

# Uploading Data

In [6]:
path = r"/content/drive/MyDrive/projects/spark-learn/tips.csv"

In [7]:
df_pyspark=spark.read.csv(path)

In [None]:
df_pyspark

_c0,_c1,_c2,_c3,_c4,_c5,_c6
total_bill,tip,sex,smoker,day,time,size
16.99,1.01,Female,No,Sun,Dinner,2
10.34,1.66,Male,No,Sun,Dinner,3
21.01,3.5,Male,No,Sun,Dinner,3
23.68,3.31,Male,No,Sun,Dinner,2
24.59,3.61,Female,No,Sun,Dinner,4
25.29,4.71,Male,No,Sun,Dinner,4
8.77,2.0,Male,No,Sun,Dinner,2
26.88,3.12,Male,No,Sun,Dinner,4
15.04,1.96,Male,No,Sun,Dinner,2


In [None]:
print(type(df_pyspark))

<class 'pyspark.sql.dataframe.DataFrame'>


In [None]:
df_pyspark.printSchema()

root
 |-- total_bill: string (nullable = true)
 |-- tip: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: string (nullable = true)



[CSV files in Spark](https://spark.apache.org/docs/latest/sql-data-sources-csv.html)

In [None]:
df_pyspark = spark.read.option('header', 'true').csv(path, inferSchema=True)

In [None]:
# Check the schema

df_pyspark.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [None]:
df_pyspark.show(3)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
+----------+----+------+------+---+------+----+
only showing top 3 rows



In [None]:
type(df_pyspark)

# Selecting Columns and Indexing

In [None]:
df_pyspark.head(3) # Returns data structure in a list format

[Row(total_bill=16.99, tip=1.01, sex='Female', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=10.34, tip=1.66, sex='Male', smoker='No', day='Sun', time='Dinner', size=3),
 Row(total_bill=21.01, tip=3.5, sex='Male', smoker='No', day='Sun', time='Dinner', size=3)]

In [None]:
df_pyspark.select('sex').show(2)

+------+
|   sex|
+------+
|Female|
|  Male|
+------+
only showing top 2 rows



In [None]:
df_pyspark.select(['total_bill', 'tip']).show(3)

+----------+----+
|total_bill| tip|
+----------+----+
|     16.99|1.01|
|     10.34|1.66|
|     21.01| 3.5|
+----------+----+
only showing top 3 rows



In [None]:
df_pyspark.dtypes

[('total_bill', 'double'),
 ('tip', 'double'),
 ('sex', 'string'),
 ('smoker', 'string'),
 ('day', 'string'),
 ('time', 'string'),
 ('size', 'int')]

# Describe Function Similar to Pandas

In [None]:
df_pyspark.describe().show()

+-------+------------------+------------------+------+------+----+------+------------------+
|summary|        total_bill|               tip|   sex|smoker| day|  time|              size|
+-------+------------------+------------------+------+------+----+------+------------------+
|  count|               244|               244|   244|   244| 244|   244|               244|
|   mean|19.785942622950824|2.9982786885245902|  NULL|  NULL|NULL|  NULL| 2.569672131147541|
| stddev| 8.902411954856857|1.3836381890011815|  NULL|  NULL|NULL|  NULL|0.9510998047322347|
|    min|              3.07|               1.0|Female|    No| Fri|Dinner|                 1|
|    max|             50.81|              10.0|  Male|   Yes|Thur| Lunch|                 6|
+-------+------------------+------------------+------+------+----+------+------------------+



# Adding Columns in Data Frame

In [None]:
from pyspark.sql.functions import col, when, rand

In [None]:
# Lets to create a new columns with random numbers between 0 an 1

seed = 42

df_pyspark = df_pyspark.withColumns(
    {"Visits": (rand(seed) *10).cast("int") + 1} # Pass column name and expression
)
df_pyspark.show(2)

+----------+----+------+------+---+------+----+------+
|total_bill| tip|   sex|smoker|day|  time|size|Visits|
+----------+----+------+------+---+------+----+------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|     7|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|     6|
+----------+----+------+------+---+------+----+------+
only showing top 2 rows



In [None]:
# New column with more than twice visits
df_pyspark = df_pyspark.withColumn(
    "Visited the restaurant more than twice",
    when(col("Visits") > 2, True).otherwise(False)
)

In [None]:
df_pyspark.show(2)

+----------+----+------+------+---+------+----+------+--------------------------------------+
|total_bill| tip|   sex|smoker|day|  time|size|Visits|Visited the restaurant more than twice|
+----------+----+------+------+---+------+----+------+--------------------------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|     7|                                  true|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|     6|                                  true|
+----------+----+------+------+---+------+----+------+--------------------------------------+
only showing top 2 rows



In [None]:
# To drop the colums

df_pyspark = df_pyspark.drop("Visited the restaurant more than twice")
df_pyspark.show(2)

+----------+----+------+------+---+------+----+------+
|total_bill| tip|   sex|smoker|day|  time|size|Visits|
+----------+----+------+------+---+------+----+------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|     7|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|     6|
+----------+----+------+------+---+------+----+------+
only showing top 2 rows



In [None]:
# Rename the columns
df_pyspark = df_pyspark.withColumnRenamed('sex','genre')
df_pyspark.show(2)

+----------+----+------+------+---+------+----+------+--------------------------------------+
|total_bill| tip| genre|smoker|day|  time|size|Visits|Visited the restaurant more than twice|
+----------+----+------+------+---+------+----+------+--------------------------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|     7|                                  true|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|     6|                                  true|
+----------+----+------+------+---+------+----+------+--------------------------------------+
only showing top 2 rows



# Handling Missing Values

## Let's create a Dataset

In [6]:
from pyspark.sql.functions import lit, col
from pyspark.sql.types import IntegerType, StringType, StructType, StructField

In [7]:
schema = StructType([
    StructField("Type of Phenomenon", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Country", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Number of Deaths", IntegerType(), True)
])

In [8]:
data = [
    ("Landslide", 2020, "USA", "California", 15),
    ("Debrisflow", 2019, "Japan", "Kyoto", None),  # NaN value for 'Number of Deaths'
    ("Flood", 2021, "India", None, 120),             # NaN value for 'State'
    ("Landslide", None, "Brazil", "Rio de Janeiro", 8),  # NaN value for 'Year'
    ("Flood", 2022, "China", "Guangdong", None),       # NaN value for 'Number of Deaths'
    ("Debrisflow", 2018, "Italy", "Rome", 5)
]

In [9]:
df_natural_disasters = spark.createDataFrame(data, schema=schema)
df_natural_disasters.show()

+------------------+----+-------+--------------+----------------+
|Type of Phenomenon|Year|Country|         State|Number of Deaths|
+------------------+----+-------+--------------+----------------+
|         Landslide|2020|    USA|    California|              15|
|        Debrisflow|2019|  Japan|         Kyoto|            NULL|
|             Flood|2021|  India|          NULL|             120|
|         Landslide|NULL| Brazil|Rio de Janeiro|               8|
|             Flood|2022|  China|     Guangdong|            NULL|
|        Debrisflow|2018|  Italy|          Rome|               5|
+------------------+----+-------+--------------+----------------+



In [12]:
## Drop NaN Values
print("DataFrame by default drop values")
df_natural_disasters.na.drop().show()

## If we use the parameters of the na property:
print("DataFrame with `how` and using `threshold` parameters")
df_natural_disasters.na.drop(how="any", thresh=2).show()

print("DataFrame with `how` and `subset` parameters")
df_natural_disasters.na.drop(how="any", subset=["Number of Deaths"]).show()

DataFrame by default drop values
+------------------+----+-------+----------+----------------+
|Type of Phenomenon|Year|Country|     State|Number of Deaths|
+------------------+----+-------+----------+----------------+
|         Landslide|2020|    USA|California|              15|
|        Debrisflow|2018|  Italy|      Rome|               5|
+------------------+----+-------+----------+----------------+

DataFrame with how = `any`
+------------------+----+-------+--------------+----------------+
|Type of Phenomenon|Year|Country|         State|Number of Deaths|
+------------------+----+-------+--------------+----------------+
|         Landslide|2020|    USA|    California|              15|
|        Debrisflow|2019|  Japan|         Kyoto|            NULL|
|             Flood|2021|  India|          NULL|             120|
|         Landslide|NULL| Brazil|Rio de Janeiro|               8|
|             Flood|2022|  China|     Guangdong|            NULL|
|        Debrisflow|2018|  Italy|      

In [14]:
### Filling the missing values

df_natural_disasters.na.fill("MISSING VALUES").show()

+------------------+----+-------+--------------+----------------+
|Type of Phenomenon|Year|Country|         State|Number of Deaths|
+------------------+----+-------+--------------+----------------+
|         Landslide|2020|    USA|    California|              15|
|        Debrisflow|2019|  Japan|         Kyoto|            NULL|
|             Flood|2021|  India|MISSING VALUES|             120|
|         Landslide|NULL| Brazil|Rio de Janeiro|               8|
|             Flood|2022|  China|     Guangdong|            NULL|
|        Debrisflow|2018|  Italy|          Rome|               5|
+------------------+----+-------+--------------+----------------+



In [15]:
# To impute the values using advanced techniques

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=["Number of Deaths"],
    outputCols=["{}_imputed".format(c) for c in ["Number of Deaths"]]
).setStrategy("mean")

In [16]:
imputer.fit(df_natural_disasters).transform(df_natural_disasters).show()

+------------------+----+-------+--------------+----------------+------------------------+
|Type of Phenomenon|Year|Country|         State|Number of Deaths|Number of Deaths_imputed|
+------------------+----+-------+--------------+----------------+------------------------+
|         Landslide|2020|    USA|    California|              15|                      15|
|        Debrisflow|2019|  Japan|         Kyoto|            NULL|                      37|
|             Flood|2021|  India|          NULL|             120|                     120|
|         Landslide|NULL| Brazil|Rio de Janeiro|               8|                       8|
|             Flood|2022|  China|     Guangdong|            NULL|                      37|
|        Debrisflow|2018|  Italy|          Rome|               5|                       5|
+------------------+----+-------+--------------+----------------+------------------------+



# Filter operation

In [13]:
# Using SQL syntaxis
df_natural_disasters.filter("`Number of Deaths`<10").show()

+------------------+----+-------+--------------+----------------+
|Type of Phenomenon|Year|Country|         State|Number of Deaths|
+------------------+----+-------+--------------+----------------+
|         Landslide|NULL| Brazil|Rio de Janeiro|               8|
|        Debrisflow|2018|  Italy|          Rome|               5|
+------------------+----+-------+--------------+----------------+



In [18]:
# To be more specific
df_natural_disasters.filter("`Number of Deaths`<10").select(['Country','Type of Phenomenon'])

Country,Type of Phenomenon
Brazil,Landslide
Italy,Debrisflow


In [19]:
# Using PySpark expressions
df_natural_disasters.filter(df_natural_disasters['Number of Deaths']<10)

Type of Phenomenon,Year,Country,State,Number of Deaths
Landslide,,Brazil,Rio de Janeiro,8
Debrisflow,2018.0,Italy,Rome,5


In [21]:
# Using different conditions

df_natural_disasters.filter((df_natural_disasters['Number of Deaths']>10) &
                            (df_natural_disasters['Type of Phenomenon']== 'Landslide'))

Type of Phenomenon,Year,Country,State,Number of Deaths
Landslide,2020,USA,California,15


# GroupBy and Aggregate Functions

Let's create more rows.

In [10]:
from pyspark.sql import Row

new_data = [
    Row("Earthquake", 2023, "Japan", "Kanto", 150),
    Row("Flood", 2022, "India", "Kerala", 40),
    Row("Hurricane", 2021, "USA", "Florida", 85),
    Row("Tornado", 2020, "USA", "Oklahoma", 30),
    Row("Volcano Eruption", 2019, "Indonesia", "Java", 70),
    Row("Earthquake", 2018, "Mexico", "Oaxaca", 90),
    Row("Flood", 2023, "Bangladesh", "Dhaka", 60),
    Row("Tsunami", 2022, "Chile", "Valparaíso", 200),
    Row("Drought", 2021, "Australia", "New South Wales", 5),
    Row("Heatwave", 2020, "France", "Paris", 110),
    Row("Earthquake", 2019, "Nepal", "Kathmandu", 50),
    Row("Flood", 2018, "China", "Guangxi", 45),
    Row("Hurricane", 2023, "Cuba", "Havana", 95),
    Row("Wildfire", 2022, "Brazil", "Amazonas", 25),
    Row("Tornado", 2021, "Canada", "Ontario", 15),
    Row("Earthquake", 2020, "Turkey", "Izmir", 65),
    Row("Flood", 2019, "Italy", "Veneto", 20),
    Row("Hurricane", 2018, "Philippines", "Luzon", 180),
    Row("Volcano Eruption", 2023, "Iceland", "Reykjavik", 10),
    Row("Earthquake", 2022, "Peru", "Lima", 45),
    Row("Flood", 2021, "Pakistan", "Sindh", 80),
    Row("Hurricane", 2020, "Mexico", "Yucatan", 50),
    Row("Tornado", 2019, "Argentina", "Santa Fe", 35),
    Row("Volcano Eruption", 2018, "Italy", "Sicily", 55),
    Row("Earthquake", 2023, "Turkey", "Ankara", 130),
    Row("Flood", 2022, "Nigeria", "Lagos", 60),
    Row("Hurricane", 2021, "Dominican Republic", "Santo Domingo", 75),
    Row("Wildfire", 2020, "Australia", "Victoria", 40),
    Row("Tornado", 2019, "USA", "Kansas", 20),
    Row("Earthquake", 2018, "Greece", "Athens", 85),
    Row("Flood", 2023, "Brazil", "Rio de Janeiro", 70),
    Row("Earthquake", 2022, "Chile", "Santiago", 120),
    Row("Hurricane", 2021, "Cuba", "Havana", 95),
    Row("Tsunami", 2020, "Peru", "Lima", 200),
    Row("Volcano Eruption", 2019, "Ecuador", "Quito", 80),
    Row("Flood", 2018, "Argentina", "Buenos Aires", 30),
    Row("Earthquake", 2023, "Mexico", "Mexico City", 160),
    Row("Drought", 2022, "Venezuela", "Caracas", 10),
    Row("Hurricane", 2021, "Dominican Republic", "Santo Domingo", 75),
    Row("Wildfire", 2020, "Bolivia", "Santa Cruz", 45),
    Row("Flood", 2019, "Colombia", "Bogotá", 55),
    Row("Tornado", 2018, "Paraguay", "Asuncion", 15),
    Row("Volcano Eruption", 2023, "Guatemala", "Antigua", 100),
    Row("Flood", 2022, "Uruguay", "Montevideo", 20),
    Row("Hurricane", 2021, "Honduras", "Tegucigalpa", 65),
    Row("Tsunami", 2020, "El Salvador", "San Salvador", 150),
    Row("Wildfire", 2019, "Panama", "Panama City", 35),
    Row("Earthquake", 2018, "Nicaragua", "Managua", 60),
    Row("Flood", 2023, "Costa Rica", "San Jose", 25),
    Row("Volcano Eruption", 2022, "Nicaragua", "Leon", 90),
    Row("Hurricane", 2021, "Belize", "Belize City", 50),
    Row("Earthquake", 2020, "Haiti", "Port-au-Prince", 180),
    Row("Flood", 2019, "Guatemala", "Guatemala City", 40),
    Row("Wildfire", 2018, "Chile", "Valparaíso", 30),
    Row("Tornado", 2023, "Argentina", "Cordoba", 25),
    Row("Hurricane", 2022, "Mexico", "Cancun", 80),
    Row("Flood", 2021, "Colombia", "Medellin", 60),
    Row("Volcano Eruption", 2020, "Ecuador", "Guayaquil", 70),
    Row("Earthquake", 2019, "Peru", "Arequipa", 50),
    Row("Wildfire", 2018, "Brazil", "Sao Paulo", 20),
    Row("Tsunami", 2023, "Chile", "La Serena", 110),
    Row("Drought", 2022, "Mexico", "Chihuahua", 15),
    Row("Flood", 2021, "Venezuela", "Maracaibo", 45),
    Row("Hurricane", 2020, "Nicaragua", "Bluefields", 85),
    Row("Tornado", 2019, "Brazil", "Porto Alegre", 35)
]

In [None]:
# Cretae a DataFrame with the new rows

new_df = spark.createDataFrame(new_data, schema=["Type of Phenomenon", "Year", "Country", "State", "Number of Deaths"])

df_natural_disasters = df_natural_disasters.union(new_df)

In [12]:
def pyspark(df):
    num_rows = df.count()
    num_cols = len(df.columns)
    print(f"Number of rows: {num_rows}")
    print(f"Number of columns: {num_cols}")
    return

In [13]:
pyspark(df_natural_disasters)

Number of rows: 71
Number of columns: 5


In [22]:
df_natural_disasters.columns

['Type of Phenomenon', 'Year', 'Country', 'State', 'Number of Deaths']

In [25]:
df_natural_disasters.dtypes

[('Type of Phenomenon', 'string'),
 ('Year', 'int'),
 ('Country', 'string'),
 ('State', 'string'),
 ('Number of Deaths', 'int')]

In [17]:
# GroupBy

df_natural_disasters.groupBy('Country').sum('Number of Deaths').show()

+-----------+---------------------+
|    Country|sum(Number of Deaths)|
+-----------+---------------------+
|      India|                  160|
|        USA|                  150|
|      Japan|                  150|
|      China|                   45|
|      Italy|                   80|
|     Brazil|                  158|
|Philippines|                  180|
|     Turkey|                  195|
|     France|                  110|
|     Greece|                   85|
|  Argentina|                   90|
|       Peru|                  295|
|      Chile|                  460|
|    Nigeria|                   60|
|       Cuba|                  190|
| Bangladesh|                   60|
|    Iceland|                   10|
|     Mexico|                  395|
|  Indonesia|                   70|
|     Canada|                   15|
+-----------+---------------------+
only showing top 20 rows



In [21]:
df_natural_disasters.groupBy('Country').count().orderBy('count', ascending=False).show()

+------------------+-----+
|           Country|count|
+------------------+-----+
|            Brazil|    5|
|            Mexico|    5|
|               USA|    4|
|             Chile|    4|
|             Italy|    3|
|         Argentina|    3|
|              Peru|    3|
|         Nicaragua|    3|
|             India|    2|
|             Japan|    2|
|             China|    2|
|            Turkey|    2|
|              Cuba|    2|
|Dominican Republic|    2|
|         Australia|    2|
|           Ecuador|    2|
|         Venezuela|    2|
|         Guatemala|    2|
|          Colombia|    2|
|       Philippines|    1|
+------------------+-----+
only showing top 20 rows



In [22]:
df_natural_disasters.agg({'Number of Deaths': 'sum'}).show()

+---------------------+
|sum(Number of Deaths)|
+---------------------+
|                 4563|
+---------------------+



# MLlib

[MLlib (DataFrame-based)](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html)

[Machine Learning Library (MLlib) Guide](https://spark.apache.org/docs/latest/ml-guide)

In [None]:
df_natural_disasters.write.parquet("path/to/export/df_natural_disasters.parquet")
