## **Install PySpark**

In [8]:
!pip install pyspark



## **Create a Spark session**

In [1]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.master('local').appName('PySpark_HandsOn').getOrCreate()  # A SparkSession is the entry point to using Spark.

In [2]:
spark.version

'3.5.4'

## **Get data from 'Sample' folder on LHS panel of Colab screen**

In [2]:
cali_housing_df = spark.read.csv('/content/sample_data/california_housing_test.csv')

**In the above cell, Spark read the .csv file without discerning the header row & schema hence** ```printSchema()``` **here doesn't show the name of headers in output:**

In [4]:
cali_housing_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)



**Re-importing the file and specifying the properties** ```header``` **and** ```inferSchema``` **this time:**

In [3]:
cali_housing_df = spark.read.csv('/content/sample_data/california_housing_test.csv', header = True, inferSchema = True)

In [7]:
cali_housing_df.printSchema() # printSchema() here shows the col names now

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



## **Viewing the frame's content:**

In [10]:
cali_housing_df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
|  -118.36|   33.82|              28.0|       67.0|          15.0|      49.0|      11.0|       6.1359|          330000.0|
|  -119.67|   36.33|              19.0|     1241.0|         244.0|     850.0|     237.0|       2.9375|           81700.0|
+---------+--------+----

## **Filtering the frame:**

In [16]:
cali_housing_df.filter(cali_housing_df['housing_median_age'] > 30).show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -119.56|   36.51|              37.0|     1018.0|         213.0|     663.0|     204.0|       1.6635|           67000.0|
|  -121.43|   38.63|              43.0|     1009.0|         225.0|     604.0|     218.0|       1.6641|           67000.0|
|  -118.02|   34.08|              31.0|     2402.0|         632.0|    2830.0|     603.0|       2.3333|          164200.0|
|  -118.24|   33.98|              45.0|      972.0|         249.0|    1288.0|     261.0|       2.2054|          125000.0|
+---------+--------+----

In [5]:
from pyspark.sql.functions import col
cali_housing_df.filter(col('housing_median_age') > 30).show(5)
# cali_housing_df.filter(col('housing_median_age') > 30)[['longitude', 'latitude', 'housing_median_age', 'median_income']].show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -119.56|   36.51|              37.0|     1018.0|         213.0|     663.0|     204.0|       1.6635|           67000.0|
|  -121.43|   38.63|              43.0|     1009.0|         225.0|     604.0|     218.0|       1.6641|           67000.0|
|  -118.02|   34.08|              31.0|     2402.0|         632.0|    2830.0|     603.0|       2.3333|          164200.0|
|  -118.24|   33.98|              45.0|      972.0|         249.0|    1288.0|     261.0|       2.2054|          125000.0|
+---------+--------+----

In [6]:
from pyspark.sql import functions as F
cali_housing_df.agg(F.min(col('median_house_value')),
                    F.max(col('median_house_value')),
                    F.count(col('median_house_value')),
                    F.sum(col('median_house_value')),
                    F.avg(col('median_house_value')),
                    F.stddev(col('median_house_value'))).show()

+-----------------------+-----------------------+-------------------------+-----------------------+-----------------------+--------------------------+
|min(median_house_value)|max(median_house_value)|count(median_house_value)|sum(median_house_value)|avg(median_house_value)|stddev(median_house_value)|
+-----------------------+-----------------------+-------------------------+-----------------------+-----------------------+--------------------------+
|                22500.0|               500001.0|                     3000|           6.17538825E8|             205846.275|        113119.68746964622|
+-----------------------+-----------------------+-------------------------+-----------------------+-----------------------+--------------------------+



## **Creating spark dataframes**

### Creating dataframe from dictionary

In [None]:
# Dictionary data
data = {'ID' : [1, 2, 3, 4, 5],
        'Name' : ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
        'Department' : ['Sales', 'HR', 'IT', 'Sales', 'IT'],
        'Salary' : [3000, 4000, 5000, 4500, 6000]
        }

**There are two ways to create a spark dataframe from a dictionary of data. First way is to create an intermediate dataframe using pandas and then passing that frame to** ```spark.createDataFrame()``` **to create a spark dataframe. Second, way is to zip the dictionary values into tuples; each tuple serves as a record for spark dataframe.**

In [73]:
import pyspark
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.getOrCreate()

In [57]:
# ------------------ WAY-1 ----------------------
import pandas as pd
pd_data = pd.DataFrame(data)
df = spark.createDataFrame(pd_data)
df.show()

+---+-------+----------+------+
| ID|   Name|Department|Salary|
+---+-------+----------+------+
|  1|  Alice|     Sales|  3000|
|  2|    Bob|        HR|  4000|
|  3|Charlie|        IT|  5000|
|  4|  David|     Sales|  4500|
|  5|    Eve|        IT|  6000|
+---+-------+----------+------+



In [70]:
# data = {'ID' : [1, 2, 3, 4, 5],
#         'Name' : ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
#         'Department' : ['Sales', 'HR', 'IT', 'Sales', 'IT'],
#         'Salary' : [3000, 4000, 5000, 4500, 6000]
#         }
# list(zip(*data.values()))

[(1, 'Alice', 'Sales', 3000),
 (2, 'Bob', 'HR', 4000),
 (3, 'Charlie', 'IT', 5000),
 (4, 'David', 'Sales', 4500),
 (5, 'Eve', 'IT', 6000)]

In [72]:
# ------------------ WAY-2 ----------------------
tuple_data = list(zip(*data.values()))  # See the rough work done in prev cell
cols = list(data.keys())  # ['ID', 'Name', 'Department', 'Salary']
df = spark.createDataFrame(tuple_data, cols)
df.show()

+---+-------+----------+------+
| ID|   Name|Department|Salary|
+---+-------+----------+------+
|  1|  Alice|     Sales|  3000|
|  2|    Bob|        HR|  4000|
|  3|Charlie|        IT|  5000|
|  4|  David|     Sales|  4500|
|  5|    Eve|        IT|  6000|
+---+-------+----------+------+



### Creating dataframe from list of tuples (and performing basic analysis)

In [8]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data = [(1, "Alice", "Sales", 3000),
        (2, "Bob", "HR", 4000),
        (3, "Charlie", "IT", 5000),
        (4, "David", "Sales", 4500),
        (5, "Eve", "IT", 6000) ]

cols = ["ID", "Name", "Department", "Salary"]
df = spark.createDataFrame(data, cols)
df.show()

+---+-------+----------+------+
| ID|   Name|Department|Salary|
+---+-------+----------+------+
|  1|  Alice|     Sales|  3000|
|  2|    Bob|        HR|  4000|
|  3|Charlie|        IT|  5000|
|  4|  David|     Sales|  4500|
|  5|    Eve|        IT|  6000|
+---+-------+----------+------+



In [11]:
df.count() # 5
df.describe().show()  # Statistics about entire df
df.describe('Salary').show() # Statistics of the 'Salary' col

+-------+-----------------+
|summary|           Salary|
+-------+-----------------+
|  count|                5|
|   mean|           4500.0|
| stddev|1118.033988749895|
|    min|             3000|
|    max|             6000|
+-------+-----------------+



In [12]:
# Average salary of each dept
df.groupby('Department').agg({'Salary' : 'avg'}).show()

+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|     Sales|     3750.0|
|        HR|     4000.0|
|        IT|     5500.0|
+----------+-----------+



In [15]:
# Employees earning more than 4000
df.filter(col('Salary') > 4000).show()

+---+-------+----------+------+
| ID|   Name|Department|Salary|
+---+-------+----------+------+
|  3|Charlie|        IT|  5000|
|  4|  David|     Sales|  4500|
|  5|    Eve|        IT|  6000|
+---+-------+----------+------+



In [16]:
# Sorting employees by their salary in descending order
df.sort(col('Salary').desc()).show()

+---+-------+----------+------+
| ID|   Name|Department|Salary|
+---+-------+----------+------+
|  5|    Eve|        IT|  6000|
|  3|Charlie|        IT|  5000|
|  4|  David|     Sales|  4500|
|  2|    Bob|        HR|  4000|
|  1|  Alice|     Sales|  3000|
+---+-------+----------+------+



In [17]:
spark.stop() # Good practice to run this line after ALL the Spark jobs have finished

# **Ingesting data from a GitHub link into Spark dataframe**

**The method** ```spark.read.csv()``` **doesn't directly fetch an internet file's content into Spark. First, the data is ingested into Pandas frame and then that frame's reference is passed to a Spark frame:**

In [20]:
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

csv_url = 'https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv'

# Read the CSV data from the URL into a Pandas DataFrame
pandas_df = pd.read_csv(csv_url)

# Convert the Pandas DataFrame into a Spark DataFrame
iris_df = spark.createDataFrame(pandas_df)

iris_df.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



**In order to fetch the data hosted on the internet directly into Spark frame using** ```spark.read.csv()``` **method, you've to first download that data to your local machine or into Colab and refer the downloaded file's URL into the method:**

In [21]:
import os
import requests
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('IrisDataIngestion').getOrCreate()


**We use** ```requests.get()``` **to download the file and save it.**

In [22]:
csv_url = "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv"
csv_path = "/content/iris.csv"  # Saving file in Colab's local storage

# Downloading the file
response = requests.get(csv_url)    # 'requests.get()' sends HTTP GET request to specified URL & retrieves data (webpages, APIs, files like CSVs/JSONs) from server.
with open(csv_path, 'wb') as file:  # 'w' (write mode) is used for text files (expects strings); 'wb' (write-binary mode) is used when downloading binary content (like CSV, images, or PDFs) to ensure it is saved correctly.
    file.write(response.content)    # writing the content of file saved in 'response' to the file

# Check if the file was downloaded successfully
os.path.exists(csv_path)  # o/p: True

True

**Now, we read the local CSV file using PySpark's** ```spark.read.csv()```.

In [24]:
df = spark.read.csv(csv_path, header = True, inferSchema = True)
df.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows

