In [1]:
import jedi

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("Spark SQL Course")
sc = SparkContext(conf=conf)

spark = (SparkSession
    .builder
    .appName("Spark SQL Course")
    .getOrCreate()
)

# `DataFrame`

In [26]:
from pyspark.sql import Row

row1 = Row(name="John", age=21)
row2 = Row(name="James", age=32)
row3 = Row(name="Jane", age=18)
row1['name']

'John'

In [27]:
df = spark.createDataFrame([row1, row2, row3])

In [28]:
df

DataFrame[age: bigint, name: string]

In [5]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [6]:
df.show()

+---+-----+
|age| name|
+---+-----+
| 21| John|
| 32|James|
| 18| Jane|
+---+-----+



In [30]:
print(df.rdd.toDebugString().decode("utf-8"))

(4) MapPartitionsRDD[99] at javaToPython at NativeMethodAccessorImpl.java:0 []
 |  MapPartitionsRDD[98] at javaToPython at NativeMethodAccessorImpl.java:0 []
 |  MapPartitionsRDD[97] at javaToPython at NativeMethodAccessorImpl.java:0 []
 |  MapPartitionsRDD[96] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []
 |  MapPartitionsRDD[95] at map at SerDeUtil.scala:137 []
 |  MapPartitionsRDD[94] at mapPartitions at SerDeUtil.scala:184 []
 |  PythonRDD[93] at RDD at PythonRDD.scala:53 []
 |  ParallelCollectionRDD[92] at parallelize at PythonRDD.scala:195 []


In [29]:
df.rdd.getNumPartitions()

4

## Creating dataframes

In [9]:
#create DataFrame from Row object
rows = [
    Row(name="John", age=21, gender="male"),
    Row(name="James", age=25, gender="female"),
    Row(name="Albert", age=46, gender="male"),
    Row(**{'name': "Caesar", 'age': 56, 'gender': 'male'})
]
df = spark.createDataFrame(rows)
df.show()

+---+------+------+
|age|gender|  name|
+---+------+------+
| 21|  male|  John|
| 25|female| James|
| 46|  male|Albert|
| 56|  male|Caesar|
+---+------+------+



In [31]:
#create DataFrame from Python List

#column names
column_names = ["name", "age", "gender"]

#données
rows = [
    ["John", 21, "male"],
    ["James", 25, "female"],
    ["Albert", 46, "male"], 
    ["Jane", 33, None]
]

df = spark.createDataFrame(rows, column_names)
df.show()

+------+---+------+
|  name|age|gender|
+------+---+------+
|  John| 21|  male|
| James| 25|female|
|Albert| 46|  male|
|  Jane| 33|  null|
+------+---+------+



In [11]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)



In [32]:
#create DataFrame from RDD
column_names = ["name", "age", "gender"]

rdd = sc.parallelize([
    ("John", 21, "male"),
    ("James", 25, "female"),
    ("Albert", 46, "male")
])

df = spark.createDataFrame(rdd, column_names)
df.show()

+------+---+------+
|  name|age|gender|
+------+---+------+
|  John| 21|  male|
| James| 25|female|
|Albert| 46|  male|
+------+---+------+



In [36]:
rdd.collect()

[('John', 21, 'male'), ('James', 25, 'female'), ('Albert', 46, 'male')]

# Schema

In [37]:
df.schema

StructType(List(StructField(name,StringType,true),StructField(age,LongType,true),StructField(gender,StringType,true)))

In [38]:
type(df.schema)

pyspark.sql.types.StructType

In [39]:
# When creating a DataFrame, the schema can be either inferred or defined by the user
from pyspark.sql.types import *

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True)
])

rows = [("John", 21, "male")]

#rows, python list, 数据放首位， 列名放第二个
df = spark.createDataFrame(rows, schema)
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)

+----+---+------+
|name|age|gender|
+----+---+------+
|John| 21|  male|
+----+---+------+



In [18]:
!pwd
from pathlib import Path
import os
dirpath = Path(os.getcwd())
dirpath.joinpath('gro.csv.gz')

/opt/polynote/notebooks/Notebooks


PosixPath('/opt/polynote/notebooks/Notebooks/gro.csv.gz')

In [53]:
df = (spark.read
    .format('csv')
    .option('header', 'true')
    .option('sep', ";")
    .load('gro.csv.gz')
)

In [54]:
df.printSchema()

root
 |-- Id_Customer: string (nullable = true)
 |-- Y: string (nullable = true)
 |-- Customer_Type: string (nullable = true)
 |-- BirthDate: string (nullable = true)
 |-- Customer_Open_Date: string (nullable = true)
 |-- P_Client: string (nullable = true)
 |-- Educational_Level: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Number_Of_Dependant: string (nullable = true)
 |-- Years_At_Residence: string (nullable = true)
 |-- Net_Annual_Income: string (nullable = true)
 |-- Years_At_Business: string (nullable = true)
 |-- Prod_Sub_Category: string (nullable = true)
 |-- Prod_Decision_Date: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Type_Of_Residence: string (nullable = true)
 |-- Nb_Of_Products: string (nullable = true)
 |-- Prod_Closed_Date: string (nullable = true)
 |-- Prod_Category: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)



In [24]:
from datetime import date

?date

In [52]:

products = spark.createDataFrame([
    ('1', 'mouse', 'microsoft', 39.99),
    ('2', 'keyboard', 'logitech', 59.99),
], ['prod_id', 'prod_cat', 'prod_brand', 'prod_value'])

purchases = spark.createDataFrame([
    (date(2017, 11, 1), 2, '1'),
    (date(2017, 11, 2), 1, '1'),
    (date(2017, 11, 5), 1, '2'),
], ['date', 'quantity', 'prod_id'])

# The default join type is the "INNER" join
purchases.join(products, 'prod_id').show()

+-------+----------+--------+--------+----------+----------+
|prod_id|      date|quantity|prod_cat|prod_brand|prod_value|
+-------+----------+--------+--------+----------+----------+
|      1|2017-11-01|       2|   mouse| microsoft|     39.99|
|      1|2017-11-02|       1|   mouse| microsoft|     39.99|
|      2|2017-11-05|       1|keyboard|  logitech|     59.99|
+-------+----------+--------+--------+----------+----------+



In [55]:
products.createOrReplaceTempView("products")
purchases.createOrReplaceTempView("purchases")

query = """
SELECT * FROM
(purchases AS prc INNER JOIN products AS prd 
on prc.prod_id = prd.prod_id)
"""
spark.sql(query).show()


+----------+--------+-------+-------+--------+----------+----------+
|      date|quantity|prod_id|prod_id|prod_cat|prod_brand|prod_value|
+----------+--------+-------+-------+--------+----------+----------+
|2017-11-01|       2|      1|      1|   mouse| microsoft|     39.99|
|2017-11-02|       1|      1|      1|   mouse| microsoft|     39.99|
|2017-11-05|       1|      2|      2|keyboard|  logitech|     59.99|
+----------+--------+-------+-------+--------+----------+----------+



In [71]:
products.withColumn("mise", products.prod_id*10).show()

+-------+--------+----------+----------+----+
|prod_id|prod_cat|prod_brand|prod_value|mise|
+-------+--------+----------+----------+----+
|      1|   mouse| microsoft|     39.99|10.0|
|      2|keyboard|  logitech|     59.99|20.0|
+-------+--------+----------+----------+----+



In [48]:
purchases.join(products, 'prod_id').explain()

== Physical Plan ==
*(5) Project [prod_id#246, date#244, quantity#245L, prod_cat#237, prod_brand#238, prod_value#239]
+- *(5) SortMergeJoin [prod_id#246], [prod_id#236], Inner
   :- *(2) Sort [prod_id#246 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(prod_id#246, 200)
   :     +- *(1) Filter isnotnull(prod_id#246)
   :        +- Scan ExistingRDD[date#244,quantity#245L,prod_id#246]
   +- *(4) Sort [prod_id#236 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(prod_id#236, 200)
         +- *(3) Filter isnotnull(prod_id#236)
            +- Scan ExistingRDD[prod_id#236,prod_cat#237,prod_brand#238,prod_value#239]


In [49]:
new_purchases = spark.createDataFrame([
    (date(2017, 11, 1), 2, '1'),
    (date(2017, 11, 2), 1, '3'),
], ['date', 'quantity', 'prod_id_x'])

# The default join type is the "INNER" join
join_rule = new_purchases.prod_id_x == products.prod_id
new_purchases.join(products, join_rule, 'left').show()


+----------+--------+---------+-------+--------+----------+----------+
|      date|quantity|prod_id_x|prod_id|prod_cat|prod_brand|prod_value|
+----------+--------+---------+-------+--------+----------+----------+
|2017-11-02|       1|        3|   null|    null|      null|      null|
|2017-11-01|       2|        1|      1|   mouse| microsoft|     39.99|
+----------+--------+---------+-------+--------+----------+----------+



In [34]:
new_purchases = spark.createDataFrame([
    (date(2017, 11, 1), 2, '1'),
    (date(2017, 11, 2), 1, '3'),
], ['date', 'quantity', 'prod_id_x'])

# The default join type is the "INNER" join
join_rule = new_purchases.prod_id_x == products.prod_id
new_purchases.join(products, join_rule, 'left').show()

+----------+--------+---------+-------+--------+----------+----------+
|      date|quantity|prod_id_x|prod_id|prod_cat|prod_brand|prod_value|
+----------+--------+---------+-------+--------+----------+----------+
|2017-11-02|       1|        3|   null|    null|      null|      null|
|2017-11-01|       2|        1|      1|   mouse| microsoft|     39.99|
+----------+--------+---------+-------+--------+----------+----------+



# Various types of joins

In [50]:

left = spark.createDataFrame([
    (1, "A1"), (2, "A2"), (3, "A3"), (4, "A4")], 
    ["id", "value"])

right = spark.createDataFrame([
    (3, "A3"), (4, "A4"), (4, "A4_1"), (5, "A5"), (6, "A6")], 
    ["id", "value"])

print("LEFT")
left.orderBy("id").show()

print("RIGHT")
right.orderBy("id").show()

join_types = [
    "inner", "outer", "left", "right",
    "leftsemi", "leftanti"
]

LEFT
+---+-----+
| id|value|
+---+-----+
|  1|   A1|
|  2|   A2|
|  3|   A3|
|  4|   A4|
+---+-----+

RIGHT
+---+-----+
| id|value|
+---+-----+
|  3|   A3|
|  4|   A4|
|  4| A4_1|
|  5|   A5|
|  6|   A6|
+---+-----+



In [51]:
for join_type in join_types:
    print(join_type)
    left.join(right, on="id", how=join_type)\
        .orderBy("id")\
        .show()

inner
+---+-----+-----+
| id|value|value|
+---+-----+-----+
|  3|   A3|   A3|
|  4|   A4|   A4|
|  4|   A4| A4_1|
+---+-----+-----+

outer
+---+-----+-----+
| id|value|value|
+---+-----+-----+
|  1|   A1| null|
|  2|   A2| null|
|  3|   A3|   A3|
|  4|   A4|   A4|
|  4|   A4| A4_1|
|  5| null|   A5|
|  6| null|   A6|
+---+-----+-----+

left
+---+-----+-----+
| id|value|value|
+---+-----+-----+
|  1|   A1| null|
|  2|   A2| null|
|  3|   A3|   A3|
|  4|   A4|   A4|
|  4|   A4| A4_1|
+---+-----+-----+

right
+---+-----+-----+
| id|value|value|
+---+-----+-----+
|  3|   A3|   A3|
|  4|   A4|   A4|
|  4|   A4| A4_1|
|  5| null|   A5|
|  6| null|   A6|
+---+-----+-----+

leftsemi
+---+-----+
| id|value|
+---+-----+
|  3|   A3|
|  4|   A4|
+---+-----+

leftanti
+---+-----+
| id|value|
+---+-----+
|  1|   A1|
|  2|   A2|
+---+-----+

