# 1. Introduction

In [1]:
import pandas as pd

In [2]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Create my_spark
my_spark = SparkSession.builder.getOrCreate()

# Print my_spark
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x000001FD7F35CC48>


In [3]:
my_spark

In [4]:
spark

In [5]:
sc

## 1.1 Loading Data

### Parquet

In [6]:
df = my_spark.read.load(r'D:\spark\spark-3.0.0-preview2-bin-hadoop2.7\examples\src\main\resources\users.parquet')

In [7]:
df.select("name", "favorite_color").collect()

[Row(name='Alyssa', favorite_color=None),
 Row(name='Ben', favorite_color='red')]

### CSV

Exemple 1

In [9]:
# # Don't change this file path
# file_path = "/usr/local/share/datasets/airports.csv"

# # Read in the airports data
# airports = spark.read.csv(file_path, header = True)

# # Show the data
# airports.show()

Exemple 2

In [10]:
df = spark.read.load(r"data/airports.csv",
                     format="csv", sep=",", inferSchema="true", header="true")

In [11]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude_deg: double (nullable = true)
 |-- longitude_deg: double (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- scheduled_service: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- home_link: string (nullable = true)
 |-- wikipedia_link: string (nullable = true)
 |-- keywords: string (nullable = true)



In [12]:
df.createOrReplaceTempView("vuelos")

In [13]:
spark.sql("SELECT * FROM vuelos LIMIT 10").toPandas()

Unnamed: 0,id,ident,type,name,latitude_deg,longitude_deg,elevation_ft,continent,iso_country,iso_region,municipality,scheduled_service,gps_code,iata_code,local_code,home_link,wikipedia_link,keywords
0,6523,00A,heliport,Total Rf Heliport,40.070801,-74.933601,11,,US,US-PA,Bensalem,no,00A,,00A,,,
1,323361,00AA,small_airport,Aero B Ranch Airport,38.704022,-101.473911,3435,,US,US-KS,Leoti,no,00AA,,00AA,,,
2,6524,00AK,small_airport,Lowell Field,59.9492,-151.695999,450,,US,US-AK,Anchor Point,no,00AK,,00AK,,,
3,6525,00AL,small_airport,Epps Airpark,34.864799,-86.770302,820,,US,US-AL,Harvest,no,00AL,,00AL,,,
4,6526,00AR,closed,Newport Hospital & Clinic Heliport,35.6087,-91.254898,237,,US,US-AR,Newport,no,,,,,,00AR
5,322127,00AS,small_airport,Fulton Airport,34.942803,-97.818019,1100,,US,US-OK,Alex,no,00AS,,00AS,,,
6,6527,00AZ,small_airport,Cordes Airport,34.305599,-112.165001,3810,,US,US-AZ,Cordes,no,00AZ,,00AZ,,,
7,6528,00CA,small_airport,Goldstone /Gts/ Airport,35.350498,-116.888,3038,,US,US-CA,Barstow,no,00CA,,00CA,,,
8,324424,00CL,small_airport,Williams Ag Airport,39.427188,-121.763427,87,,US,US-CA,Biggs,no,00CL,,00CL,,,
9,322658,00CN,heliport,Kitchen Creek Helibase Heliport,32.727374,-116.459742,3350,,US,US-CA,Pine Valley,no,00CN,,00CN,,,


In [14]:
# Select everybody, but increment the age by 1
df.select(df['id'], df['ident']).show()

+------+-----+
|    id|ident|
+------+-----+
|  6523|  00A|
|323361| 00AA|
|  6524| 00AK|
|  6525| 00AL|
|  6526| 00AR|
|322127| 00AS|
|  6527| 00AZ|
|  6528| 00CA|
|324424| 00CL|
|322658| 00CN|
|  6529| 00CO|
|  6531| 00FA|
|  6532| 00FD|
|  6533| 00FL|
|  6534| 00GA|
|  6535| 00GE|
|  6536| 00HI|
|  6537| 00ID|
|322581| 00IG|
|  6538| 00II|
+------+-----+
only showing top 20 rows



In [17]:
# Select people older than 21
df.filter(df['id'] > 200).show()

+------+-----+-------------+--------------------+------------------+-------------------+------------+---------+-----------+----------+------------+-----------------+--------+---------+----------+---------+--------------+--------+
|    id|ident|         type|                name|      latitude_deg|      longitude_deg|elevation_ft|continent|iso_country|iso_region|municipality|scheduled_service|gps_code|iata_code|local_code|home_link|wikipedia_link|keywords|
+------+-----+-------------+--------------------+------------------+-------------------+------------+---------+-----------+----------+------------+-----------------+--------+---------+----------+---------+--------------+--------+
|  6523|  00A|     heliport|   Total Rf Heliport|    40.07080078125| -74.93360137939453|          11|       NA|         US|     US-PA|    Bensalem|               no|     00A|     null|       00A|     null|          null|    null|
|323361| 00AA|small_airport|Aero B Ranch Airport|         38.704022|        -101

In [19]:
# Count people by age
#df.groupBy("age").count().show()

In [20]:
# Register the DataFrame as a SQL temporary view named as people
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+------+-----+-------------+--------------------+------------------+-------------------+------------+---------+-----------+----------+------------+-----------------+--------+---------+----------+---------+--------------+--------+
|    id|ident|         type|                name|      latitude_deg|      longitude_deg|elevation_ft|continent|iso_country|iso_region|municipality|scheduled_service|gps_code|iata_code|local_code|home_link|wikipedia_link|keywords|
+------+-----+-------------+--------------------+------------------+-------------------+------------+---------+-----------+----------+------------+-----------------+--------+---------+----------+---------+--------------+--------+
|  6523|  00A|     heliport|   Total Rf Heliport|    40.07080078125| -74.93360137939453|          11|       NA|         US|     US-PA|    Bensalem|               no|     00A|     null|       00A|     null|          null|    null|
|323361| 00AA|small_airport|Aero B Ranch Airport|         38.704022|        -101

In [21]:
# You can set it as a Global temporary view
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+------+-----+-------------+--------------------+------------------+-------------------+------------+---------+-----------+----------+------------+-----------------+--------+---------+----------+---------+--------------+--------+
|    id|ident|         type|                name|      latitude_deg|      longitude_deg|elevation_ft|continent|iso_country|iso_region|municipality|scheduled_service|gps_code|iata_code|local_code|home_link|wikipedia_link|keywords|
+------+-----+-------------+--------------------+------------------+-------------------+------------+---------+-----------+----------+------------+-----------------+--------+---------+----------+---------+--------------+--------+
|  6523|  00A|     heliport|   Total Rf Heliport|    40.07080078125| -74.93360137939453|          11|       NA|         US|     US-PA|    Bensalem|               no|     00A|     null|       00A|     null|          null|    null|
|323361| 00AA|small_airport|Aero B Ranch Airport|         38.704022|        -101

### Json

In [22]:
path = r'D:/spark/spark-3.0.0-preview2-bin-hadoop2.7/examples/src/main/resources/people.json'
peopleDF = spark.read.json(path)

In [23]:
peopleDF.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [24]:
# Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

In [25]:
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()

+------+
|  name|
+------+
|Justin|
+------+



## 1.2 Creating a Dataframe from JSON (object)

In [26]:
# Alternatively, a DataFrame can be created for a JSON dataset represented by
# an RDD[String] storing one JSON object per string
jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()

+----------------+----+
|         address|name|
+----------------+----+
|[Columbus, Ohio]| Yin|
+----------------+----+



## 1.3 Create Spark from DataFrame

In [27]:
xx = pd.DataFrame([[1,2,3],[2,3,4],[3,4,5]], columns = ["A", "B","C"])

In [28]:
spark_df = sqlContext.createDataFrame(xx)

In [29]:
spark_df.show(2)

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  1|  2|  3|
|  2|  3|  4|
+---+---+---+
only showing top 2 rows



In [30]:
spark_df.createOrReplaceTempView("test")

In [31]:
spark.sql("select * from test").collect()

[Row(A=1, B=2, C=3), Row(A=2, B=3, C=4), Row(A=3, B=4, C=5)]

In [32]:
spark_df.toPandas()

Unnamed: 0,A,B,C
0,1,2,3
1,2,3,4
2,3,4,5


## 1.4 Add dataframe to Catalog

In [44]:
spark.catalog.listTables()

[Table(name='pd_temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='people', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='test', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='vuelos', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [33]:
import numpy as np

In [40]:
# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))

# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp)

# Examine the tables in the catalog
print(spark.catalog.listTables())

[Table(name='pd_temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='people', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='test', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='vuelos', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [41]:
# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView("pd_temp")

# Examine the tables in the catalog again
print(spark.catalog.listTables())

[Table(name='pd_temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='people', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='test', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='vuelos', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


## 1.5 WithColumn

In [45]:
# Create the DataFrame flights
flights = spark.table("vuelos")

In [46]:
flights.printSchema()

root
 |-- id: integer (nullable = true)
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude_deg: double (nullable = true)
 |-- longitude_deg: double (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- scheduled_service: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- home_link: string (nullable = true)
 |-- wikipedia_link: string (nullable = true)
 |-- keywords: string (nullable = true)



In [36]:
# Show the head
flights.show()

# Add duration_hrs
flights = flights.withColumn("duration_hrs", flights.air_time/60)

AnalysisException: Table or view not found: flights;;
'UnresolvedRelation [flights]


## 1.6 Filter Columns

In [None]:
# Filter flights by passing a string
long_flights1 = flights.filter("distance > 1000")

# Filter flights by passing a column of boolean values
long_flights2 = flights.filter(flights.distance > 1000)

# Print the data to check they're equal
long_flights1.show()
long_flights2.show()