# Intro to Pyspark

In [1]:
# https://spark.apache.org/
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=7f03a45d34040b0dca2bcfefcdc158da5123557869b08fd7ba056d1be999bc9e
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
from pyspark import SparkContext

# Create a SparkContext
sc = SparkContext("local", "My Spark App")

In [None]:
# Verify SparkContext
print(sc)

# Print Spark version
print(sc.version)

<SparkContext master=local appName=My Spark App>
3.4.1


In [4]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Create my_spark
spark = SparkSession.builder.getOrCreate()

# Print my_spark
print(spark)

<pyspark.sql.session.SparkSession object at 0x7d9dee46b160>


In [5]:
# Print the tables in the catalog
print(spark.catalog.listTables())

[]


In [6]:
import pandas as pd
from pyspark.sql import SparkSession

flights = spark.read.csv('/content/airport.csv', header=True, inferSchema=True)

flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [None]:
flights.createOrReplaceTempView("flights_temp_table")

query = 'FROM flights_temp_table SELECT * LIMIT 10'

# Get the first 10 rows of flights
flights10 = spark.sql(query)

# Show the results
flights10.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [None]:
query = "SELECT origin, dest, COUNT(*) as N FROM flights_temp_table GROUP BY origin, dest"

# Run the query
flight_counts = spark.sql(query)

# Convert the results to a pandas DataFrame
pd_counts = flight_counts.toPandas()

# Print the head of pd_counts
print(pd_counts.head())

  origin dest    N
0    SEA  RNO    8
1    SEA  DTW   98
2    SEA  CLE    2
3    SEA  LAX  450
4    PDX  SEA  144


In [None]:
import numpy as np

# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))

# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp)

# Examine the tables in the catalog
print(spark.catalog.listTables())

# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView("temp")

# # Examine the tables in the catalog again
print("temp")

[Table(name='flights_temp_table', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]
temp


# People Data Analysis

Spark DataFrames are the workhouse and main way of working with Spark and Python post Spark 2.0. DataFrames act as powerful versions of tables, with rows and columns, easily handling large datasets. The shift to DataFrames provides many advantages:

- A much simpler syntax
- Ability to use SQL directly in the dataframe
- Operations are automatically distributed across RDDs

If you've used R or even the pandas library with Python you are probably already familiar with the concept of DataFrames. Spark DataFrame expand on a lot of these concepts, allowing you to transfer that knowledge easily by understanding the simple syntax of Spark DataFrames. Remember that the main advantage to using Spark DataFrames vs those other programs is that Spark can handle data across many RDDs, huge data sets that would never fit on a single computer.

In [7]:
# creating a dataframe

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Basics").getOrCreate()

In [8]:
# Load json file in people dataframe

people = spark.read.json('/content/people.json')

people.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [9]:
# print schema of people df

people.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [10]:
# Find the column names of df

people.columns

['age', 'name']

In [11]:
# Find datatype of the columns

people.describe()

DataFrame[summary: string, age: string, name: string]

In [12]:
# Infer Schema in df

people.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



Some datatypes make it easier to infer schema (like tabular formats such as csv).

However you often have to set the schema yourself if you aren't dealing with a .read method that doesn't have inferSchema() built-in.

Spark has all the tools you need for this, it just requires a very specific structure:

In [13]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

we need to create the list of Structure fields

* :param name: string, name of the field.
* :param dataType: :class:`DataType` of the field.
* :param nullable: boolean, whether the field can be null (None) or not.

In [27]:
# Define schema for the file

data_schema = [StructField('age', IntegerType(), True), StructField('name', StringType(), False)]

final_struct = StructType(fields = data_schema)

people1 = spark.read.json('/content/people.json', schema = final_struct)

people1.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [18]:
# Understanding the data

people['age']

Column<'age'>

In [31]:
type(people['age'])

pyspark.sql.column.Column

In [32]:
people.select('age')

DataFrame[age: int]

In [33]:
type(people.select('age'))

pyspark.sql.dataframe.DataFrame

In [34]:
people.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [36]:
# returns 2 records

people.head(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [39]:
people.select(['name', 'age']).show()

+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+



In [41]:
# Adding new columns by copying

people.withColumn('newage', people['age']).show()

+----+-------+------+
| age|   name|newage|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    30|
|  19| Justin|    19|
+----+-------+------+



In [42]:
people.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [46]:
# Rename column

people.withColumnRenamed('age','superage').show()

+--------+-------+
|superage|   name|
+--------+-------+
|    null|Michael|
|      30|   Andy|
|      19| Justin|
+--------+-------+



In [48]:
# Create new column with double the age

people.withColumn('Double age', people['age']*2).show()

+----+-------+----------+
| age|   name|Double age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [50]:
# Create new column by adding 5 years to age column

people.withColumn('After 5', people['age']+5).show()

+----+-------+-------+
| age|   name|After 5|
+----+-------+-------+
|null|Michael|   null|
|  30|   Andy|     35|
|  19| Justin|     24|
+----+-------+-------+



In [59]:
# Create new column by halfing the age
import pyspark.sql.functions as F

people.withColumn('Half age', F.round(people['age']/2)).show()

+----+-------+--------+
| age|   name|Half age|
+----+-------+--------+
|null|Michael|    null|
|  30|   Andy|    15.0|
|  19| Justin|    10.0|
+----+-------+--------+



In [69]:
# Using SQL

# Register the Datframe as a SQL temporary view

people.createOrReplaceTempView('people_sql')

results_sql = spark.sql('SELECT * FROM people_sql')

results_sql.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [77]:
# filter all people whose age is >30 years

results_age = spark.sql('SELECT * FROM people_sql WHERE age>=30')

results_age.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

