#Getting Started with PySpark in Google Colab

PySpark is Python interface for Apache Spark. The primary use cases for PySpark are to work with huge amounts of data and for creating data pipelines.

Spark and PySpark happen to also come installed already in Colab instances, but you can try installing them on your own Cloud instances.

See more here! http://spark.apache.org/docs/latest/api/python/

# 1. Try PySpark in Google Colab

In [1]:
import pyspark
from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

In [2]:
spark

# 2. Reading Data

For this example, we use a publicly available data set in a CSV format. This is Covid-19 data aggregated in a dataset by Our World in Data (https://ourworldindata.org).

In [3]:
import requests
path = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/owid-covid-data.csv"
req = requests.get(path)
url_content = req.content

csv_file_name = 'owid-covid-data.csv'
csv_file = open(csv_file_name, 'wb')

csv_file.write(url_content)
csv_file.close()

df = spark.read.csv('/content/'+csv_file_name, header=True, inferSchema=True)

In [4]:
df.columns

['iso_code',
 'continent',
 'location',
 'date',
 'total_cases',
 'new_cases',
 'new_cases_smoothed',
 'total_deaths',
 'new_deaths',
 'new_deaths_smoothed',
 'total_cases_per_million',
 'new_cases_per_million',
 'new_cases_smoothed_per_million',
 'total_deaths_per_million',
 'new_deaths_per_million',
 'new_deaths_smoothed_per_million',
 'reproduction_rate',
 'icu_patients',
 'icu_patients_per_million',
 'hosp_patients',
 'hosp_patients_per_million',
 'weekly_icu_admissions',
 'weekly_icu_admissions_per_million',
 'weekly_hosp_admissions',
 'weekly_hosp_admissions_per_million',
 'total_tests',
 'new_tests',
 'total_tests_per_thousand',
 'new_tests_per_thousand',
 'new_tests_smoothed',
 'new_tests_smoothed_per_thousand',
 'positive_rate',
 'tests_per_case',
 'tests_units',
 'total_vaccinations',
 'people_vaccinated',
 'people_fully_vaccinated',
 'total_boosters',
 'new_vaccinations',
 'new_vaccinations_smoothed',
 'total_vaccinations_per_hundred',
 'people_vaccinated_per_hundred',
 'peo

In [5]:
df.count()

429435

In [6]:
rdd_raw = df.rdd
# Define column indices (important since RDDs don't use column names)
# Indices based on your provided data structure:
CONTINENT_INDEX = 1
NEW_CASES_INDEX = 5

# Optional: Clean up null/bad data before mapping
# We filter out rows where the key or value is null
rdd_filtered = rdd_raw.filter(
    lambda row: row[CONTINENT_INDEX] is not None and row[NEW_CASES_INDEX] is not None
)

In [7]:
rdd_filtered.take(1)

[Row(iso_code='AFG', continent='Asia', location='Afghanistan', date=datetime.date(2020, 1, 5), total_cases=0, new_cases=0, new_cases_smoothed=None, total_deaths=0, new_deaths=0, new_deaths_smoothed=None, total_cases_per_million=0.0, new_cases_per_million=0.0, new_cases_smoothed_per_million=None, total_deaths_per_million=0.0, new_deaths_per_million=0.0, new_deaths_smoothed_per_million=None, reproduction_rate=None, icu_patients=None, icu_patients_per_million=None, hosp_patients=None, hosp_patients_per_million=None, weekly_icu_admissions=None, weekly_icu_admissions_per_million=None, weekly_hosp_admissions=None, weekly_hosp_admissions_per_million=None, total_tests=None, new_tests=None, total_tests_per_thousand=None, new_tests_per_thousand=None, new_tests_smoothed=None, new_tests_smoothed_per_thousand=None, positive_rate=None, tests_per_case=None, tests_units=None, total_vaccinations=None, people_vaccinated=None, people_fully_vaccinated=None, total_boosters=None, new_vaccinations=None, new_

In [8]:
# MAPS key as continent to value as new cases
mapped_rdd = rdd_filtered.map(
    lambda row: (
        row[CONTINENT_INDEX],
        float(row[NEW_CASES_INDEX])
    )
)

In [9]:
# REDUCES values by key (continent) by aggregating the values (summing them up)
reduced_rdd = mapped_rdd.reduceByKey(lambda a, b: a + b) # the lambda function performs the sum operation between values

In [10]:
# COLLECT: Collect and print the results. The maps and reduces do not run until results are collected
total_cases_by_continent = reduced_rdd.collect()

print("## MapReduce RDD Results: Total New Cases per Continent")
print("-" * 50)
for continent, total_cases in total_cases_by_continent:
    print(f"Continent: **{continent}**, Total New Cases: **{total_cases:,.2f}**")

## MapReduce RDD Results: Total New Cases per Continent
--------------------------------------------------
Continent: **Asia**, Total New Cases: **301,564,180.00**
Continent: **Europe**, Total New Cases: **252,916,868.00**
Continent: **Africa**, Total New Cases: **13,146,831.00**
Continent: **South America**, Total New Cases: **68,811,012.00**
Continent: **Oceania**, Total New Cases: **15,003,468.00**
Continent: **North America**, Total New Cases: **124,492,698.00**


#3. PySpark also allows you to use Python-style DataFrames with Spark!

In [11]:
#Viewing the dataframe schema
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: integer (nullable = true)
 |-- new_cases: integer (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: integer (nullable = true)
 |-- new_deaths: integer (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: integer (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: integer (nullable = true)
 |-- hosp_patients_per_mil

In [12]:
#Converting a date column
df.select(F.to_date(df.date).alias('date'))

DataFrame[date: date]

In [13]:
#Summary stats
df.describe().show()

+-------+--------+-------------+-----------+--------------------+------------------+------------------+------------------+------------------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+------------------+------------------+------------------------+------------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-------------------+------------------+------------------------+----------------------+------------------+-------------------------------+-------------------+------------------+-------------+--------------------+--------------------+-----------------------+--------------------+------------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------

In [14]:
#DataFrame Filtering
df.filter(df.location == "Sweden").orderBy(F.desc("date")).show()

+--------+---------+--------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+----------------------------------

In [15]:
#Simple Group by Function
df.groupBy("location").sum("new_cases").orderBy(F.desc("sum(new_cases)")).show(truncate=False)

+-----------------------------+--------------+
|location                     |sum(new_cases)|
+-----------------------------+--------------+
|World                        |775935057     |
|High-income countries        |429044052     |
|Asia                         |301564180     |
|Europe                       |252916868     |
|Upper-middle-income countries|251756125     |
|European Union (27)          |185822587     |
|North America                |124492698     |
|United States                |103436829     |
|China                        |99373219      |
|Lower-middle-income countries|92019711      |
|South America                |68811012      |
|India                        |45041748      |
|France                       |38997490      |
|Germany                      |38437756      |
|Brazil                       |37511921      |
|South Korea                  |34571873      |
|Japan                        |33803572      |
|Italy                        |26781078      |
|United Kingd

In [16]:
df = spark.read.csv("/content/sample_data/california_housing_train.csv", header=True, inferSchema=True)

In [17]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [18]:
#print N rows
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
+---------+--------+----

In [19]:
df.count()

17000

In [20]:
df.select("housing_median_age","total_rooms").show(5)

+------------------+-----------+
|housing_median_age|total_rooms|
+------------------+-----------+
|              15.0|     5612.0|
|              19.0|     7650.0|
|              17.0|      720.0|
|              14.0|     1501.0|
|              20.0|     1454.0|
+------------------+-----------+
only showing top 5 rows



In [21]:
df.describe().show()

+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+
|summary|          longitude|          latitude|housing_median_age|      total_rooms|   total_bedrooms|        population|       households|     median_income|median_house_value|
+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+
|  count|              17000|             17000|             17000|            17000|            17000|             17000|            17000|             17000|             17000|
|   mean|-119.56210823529375|  35.6252247058827| 28.58935294117647|2643.664411764706|539.4108235294118|1429.5739411764705|501.2219411764706| 3.883578100000021|207300.91235294117|
| stddev| 2.0051664084260357|2.1373397946570867|12.586936981660406|2179.947071452777|421.4994515798648| 1

In [22]:
df.select('total_rooms').distinct().show()

+-----------+
|total_rooms|
+-----------+
|      934.0|
|     3980.0|
|     4142.0|
|      596.0|
|     1761.0|
|     5983.0|
|     2815.0|
|     6433.0|
|      299.0|
|     2734.0|
|      769.0|
|     1051.0|
|     7554.0|
|     4066.0|
|     2862.0|
|     3597.0|
|      692.0|
|      720.0|
|     1765.0|
|     2523.0|
+-----------+
only showing top 20 rows



In [23]:
from pyspark.sql import functions as F
test = df.groupBy('total_rooms').agg(F.sum('housing_median_age'))

In [24]:
test.toPandas()

Unnamed: 0,total_rooms,sum(housing_median_age)
0,934.0,135.0
1,3980.0,25.0
2,4142.0,37.0
3,596.0,25.0
4,1761.0,154.0
...,...,...
5528,3620.0,18.0
5529,947.0,62.0
5530,710.0,52.0
5531,91.0,43.0


In [25]:
#Counting and removing missing values

df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+



# Creating a Test Spark DataFrame

In [27]:

data = [
    ('John', 'Smith', 1),
    ('Jane', 'Smith', 2),
    ('Jonas', 'Smith', 3),
]

columns = ["firstname", "lastname", "id"]

df = spark.createDataFrame(data=data, schema=columns)
df.show()


+---------+--------+---+
|firstname|lastname| id|
+---------+--------+---+
|     John|   Smith|  1|
|     Jane|   Smith|  2|
|    Jonas|   Smith|  3|
+---------+--------+---+



In [None]:
df

# 4. Challenge: Try replicating your Map-Reduce code from Lab 3 in Spark!

1. Start Spark

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("NASA Logs Analysis").getOrCreate()
spark


2. Upload Nasa File

In [3]:
from google.colab import files
uploaded = files.upload()


Saving nasa_access_log_aug95_sample.txt to nasa_access_log_aug95_sample.txt


3. Load your NASA file as an RDD

In [4]:
rdd = spark.sparkContext.textFile("/content/nasa_access_log_aug95_sample.txt")
rdd.take(5)


['159.142.165.138 - - [15/Aug/1995:11:03:22 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179',
 '134.131.38.18 - - [22/Aug/1995:13:43:38 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179',
 'os2c14.aca.ilstu.edu - - [31/Aug/1995:21:47:11 -0400] "GET /shuttle/missions/sts-69/sts-69-patch-small.gif HTTP/1.0" 200 8083',
 'suba01.suba.com - - [24/Aug/1995:04:48:23 -0400] "GET /htbin/wais.pl?TISP HTTP/1.0" 200 1349',
 '146.138.145.170 - - [08/Aug/1995:16:30:51 -0400] "GET /shuttle/missions/sts-62/sts-62-patch-small.gif HTTP/1.0" 200 14385']

1. Which files were most popular (GET requests)?

In [5]:
# keep only lines that contain GET
get_requests = rdd.filter(lambda line: "GET" in line)

# extract the file path (the thing after GET)
mapped_files = get_requests.map(
    lambda line: (line.split()[6], 1)
)

# reduce: count each file
file_counts = mapped_files.reduceByKey(lambda a, b: a + b)

# sort by count
top_files = file_counts.takeOrdered(10, key=lambda x: -x[1])
top_files


[('/images/NASA-logosmall.gif', 6193),
 ('/images/KSC-logosmall.gif', 4917),
 ('/images/MOSAIC-logosmall.gif', 4216),
 ('/images/WORLD-logosmall.gif', 4212),
 ('/images/USA-logosmall.gif', 4210),
 ('/images/ksclogo-medium.gif', 3942),
 ('/ksc.html', 2781),
 ('/history/apollo/images/apollo-logo1.gif', 2378),
 ('/images/launch-logo.gif', 2194),
 ('/images/ksclogosmall.gif', 1846)]

2. What day had the most requests?

In [6]:
import re

def extract_date(line):
    match = re.search(r"\[(\d{2}/\w{3}/\d{4})", line)
    return match.group(1) if match else None

# map: (date, 1)
dates = rdd.map(lambda line: (extract_date(line), 1)).filter(lambda x: x[0] is not None)

# reduce
date_counts = dates.reduceByKey(lambda a, b: a + b)

# find most requests
most_requests = date_counts.takeOrdered(1, key=lambda x: -x[1])
most_requests


[('31/Aug/1995', 5717)]

3. How many HTTP 200 (OK) responses?

In [7]:
status_200 = rdd.filter(lambda line: " 200 " in line)
count_200 = status_200.count()
count_200


88996

4. How many OTHER HTTP codes?

In [8]:
# extract status code
def get_status(line):
    parts = line.split()
    try:
        return parts[-2]
    except:
        return None

status_rdd = rdd.map(lambda line: (get_status(line), 1)).filter(lambda x: x[0] is not None)

# reduce
status_counts = status_rdd.reduceByKey(lambda a, b: a + b)

status_counts.collect()


[('200', 88996),
 ('302', 1644),
 ('404', 652),
 ('403', 12),
 ('501', 2),
 ('-', 1),
 ('304', 8596)]

5. Biggest, smallest, average file sizes

In [9]:
def get_size(line):
    parts = line.split()
    try:
        return int(parts[-1])
    except:
        return None

sizes = rdd.map(get_size).filter(lambda x: x is not None)

max_size = sizes.max()
min_size = sizes.min()
avg_size = sizes.mean()

(max_size, min_size, avg_size)


(3155499, 0, 17458.96599690874)