### Exit SPARK_HOME and run the follow command before staring jupyter

In [1]:
# export SPARK_HOME=/Users/Storage/Soft/spark-3.0.1-bin-hadoop3.2
# export PYSPARK_PYTHON=python3
# export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH
# export PATH=$SPARK_HOME/bin:$SPARK_HOME/python:$PATH

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

In [2]:
from pyspark.sql.types import StructType, StructField, StringType,IntegerType, DoubleType,BooleanType
from pyspark.sql.functions import *

In [4]:
spark = SparkSession \
            .builder \
            .master("local[1]") \
            .appName('dungcao.com') \
            .getOrCreate()
#             .config("spark.jars", "/Users/Storage/Soft/mysql-connector-java-8.0.30.jar") \

spark.sparkContext.setLogLevel('ERROR')

23/04/24 16:11:44 WARN Utils: Your hostname, Dungs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.10.25 instead (on interface en0)
23/04/24 16:11:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/24 16:12:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x105942940>


## 1. Create DataFrame from Python list

In [6]:
data = [("James","","Smith","36636","M",60000),
        ("Michael","Rose","","40288","M",70000),
        ("Robert","","Williams","42114","",400000),
        ("Maria","Anne","Jones","39192","F",500000),
        ("Jen","Mary","Brown","","F",0)]

columns = ["first_name","middle_name","last_name","dob","gender","salary"]
pysparkDF = spark.createDataFrame(data = data, schema = columns)
pysparkDF.printSchema()
pysparkDF.show(truncate=False)

root
 |-- first_name: string (nullable = true)
 |-- middle_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name|dob  |gender|salary|
+----------+-----------+---------+-----+------+------+
|James     |           |Smith    |36636|M     |60000 |
|Michael   |Rose       |         |40288|M     |70000 |
|Robert    |           |Williams |42114|      |400000|
|Maria     |Anne       |Jones    |39192|F     |500000|
|Jen       |Mary       |Brown    |     |F     |0     |
+----------+-----------+---------+-----+------+------+



                                                                                

##### Nested Structure Elements

In [7]:
dataStruct = [(("James","","Smith"),"36636","M","3000"), \
      (("Michael","Rose",""),"40288","M","4000"), \
      (("Robert","","Williams"),"42114","M","4000"), \
      (("Maria","Anne","Jones"),"39192","F","4000"), \
      (("Jen","Mary","Brown"),"","F","-1") \
]

schemaStruct = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
          StructField('dob', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', StringType(), True)
         ])


df = spark.createDataFrame(data=dataStruct, schema = schemaStruct)
df.printSchema()
df.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)

+--------------------+-----+------+------+
|name                |dob  |gender|salary|
+--------------------+-----+------+------+
|[James, , Smith]    |36636|M     |3000  |
|[Michael, Rose, ]   |40288|M     |4000  |
|[Robert, , Williams]|42114|M     |4000  |
|[Maria, Anne, Jones]|39192|F     |4000  |
|[Jen, Mary, Brown]  |     |F     |-1    |
+--------------------+-----+------+------+



### 2. Read from text file (csv, json,...)

In [8]:
df2 = spark.read.option("header",True) \
     .csv("/Users/Storage/OtherTeaching/Python/practice/epldata_final.csv")
df2.printSchema()

root
 |-- name: string (nullable = true)
 |-- club: string (nullable = true)
 |-- age: string (nullable = true)
 |-- position: string (nullable = true)
 |-- position_cat: string (nullable = true)
 |-- market_value: string (nullable = true)
 |-- page_views: string (nullable = true)
 |-- fpl_value: string (nullable = true)
 |-- fpl_sel: string (nullable = true)
 |-- fpl_points: string (nullable = true)
 |-- region: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- new_foreign: string (nullable = true)
 |-- age_cat: string (nullable = true)
 |-- club_id: string (nullable = true)
 |-- big_club: string (nullable = true)
 |-- new_signing: string (nullable = true)



In [9]:
df2.show(5)

+-----------------+-------+---+--------+------------+------------+----------+---------+-------+----------+------+--------------+-----------+-------+-------+--------+-----------+
|             name|   club|age|position|position_cat|market_value|page_views|fpl_value|fpl_sel|fpl_points|region|   nationality|new_foreign|age_cat|club_id|big_club|new_signing|
+-----------------+-------+---+--------+------------+------------+----------+---------+-------+----------+------+--------------+-----------+-------+-------+--------+-----------+
|   Alexis Sanchez|Arsenal| 28|      LW|           1|          65|      4329|       12| 17.10%|       264|     3|         Chile|          0|      4|      1|       1|          0|
|       Mesut Ozil|Arsenal| 28|      AM|           1|          50|      4395|      9.5|  5.60%|       167|     2|       Germany|          0|      4|      1|       1|          0|
|        Petr Cech|Arsenal| 35|      GK|           4|           7|      1529|      5.5|  5.90%|       134|    

#### + Read with defined schema

In [10]:
schema = StructType() \
      .add("RecordNumber",IntegerType(),True) \
      .add("Zipcode",IntegerType(),True) \
      .add("ZipCodeType",StringType(),True) \
      .add("City",StringType(),True) \
      .add("State",StringType(),True) \
      .add("LocationType",StringType(),True) \
      .add("Lat",DoubleType(),True) \
      .add("Long",DoubleType(),True) \
      .add("Xaxis",IntegerType(),True) \
      .add("Yaxis",DoubleType(),True) \
      .add("Zaxis",DoubleType(),True) \
      .add("WorldRegion",StringType(),True) \
      .add("Country",StringType(),True) \
      .add("LocationText",StringType(),True) \
      .add("Location",StringType(),True) \
      .add("Decommisioned",BooleanType(),True) \
      .add("TaxReturnsFiled",StringType(),True) \
      .add("EstimatedPopulation",IntegerType(),True) \
      .add("TotalWages",IntegerType(),True) \
      .add("Notes",StringType(),True)
      
df_with_schema = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load("/Users/Storage/OtherTeaching/Data/zipcodes.csv")
df_with_schema.printSchema()

root
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Xaxis: integer (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- TaxReturnsFiled: string (nullable = true)
 |-- EstimatedPopulation: integer (nullable = true)
 |-- TotalWages: integer (nullable = true)
 |-- Notes: string (nullable = true)



In [11]:
df_with_schema.show(5)

+------------+-------+-----------+-------------------+-----+--------------+-----+------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-----+
|RecordNumber|Zipcode|ZipCodeType|               City|State|  LocationType|  Lat|  Long|Xaxis|Yaxis|Zaxis|WorldRegion|Country|        LocationText|            Location|Decommisioned|TaxReturnsFiled|EstimatedPopulation|TotalWages|Notes|
+------------+-------+-----------+-------------------+-----+--------------+-----+------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-----+
|           1|    704|   STANDARD|        PARC PARQUE|   PR|NOT ACCEPTABLE|17.96|-66.22| null|-0.87|  0.3|         NA|     US|     Parc Parque, PR|NA-US-PR-PARC PARQUE|        false|           null|               null|      null| null|
|           2|    704|   STANDARD|PASEO COSTA DEL SUR|  

#### + Write dataframe to file

In [12]:
#df_with_schema.write.option("header",True).csv("local or hdfs path")

#### + Read JSON 

In [13]:
df_with_schema_json = spark.read.schema(schema) \
        .json("/Users/Storage/OtherTeaching/Data/zipcodes.json")
df_with_schema_json.printSchema()
df_with_schema_json.show(5)

root
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Xaxis: integer (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- TaxReturnsFiled: string (nullable = true)
 |-- EstimatedPopulation: integer (nullable = true)
 |-- TotalWages: integer (nullable = true)
 |-- Notes: string (nullable = true)

+------------+-------+-----------+-------------------+-----+--------------+-----+------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------

### 3. Create DataFrame from RDD

In [14]:
# read data into RDD
def extract(line):
    split_line = line.split(",")
    result = [split_line[idx] for idx in [0,1,2,3,5,11]]
    return result

rdd = spark.sparkContext.textFile('/Users/Storage/OtherTeaching/Python/practice/epldata_final.csv') \
    .filter(lambda line: len(line.split()) > 1)\
    .map(extract)
# rdd.collect()

In [15]:
# convert RDD to DataFrame
df_rdd = rdd.toDF(["name","club","age","position","market_value","nationality"])
df_rdd.show(5)

+-----------------+-------+---+--------+------------+--------------+
|             name|   club|age|position|market_value|   nationality|
+-----------------+-------+---+--------+------------+--------------+
|   Alexis Sanchez|Arsenal| 28|      LW|          65|         Chile|
|       Mesut Ozil|Arsenal| 28|      AM|          50|       Germany|
|        Petr Cech|Arsenal| 35|      GK|           7|Czech Republic|
|     Theo Walcott|Arsenal| 28|      RW|          20|       England|
|Laurent Koscielny|Arsenal| 31|      CB|          22|        France|
+-----------------+-------+---+--------+------------+--------------+
only showing top 5 rows



In [16]:
df_rdd = df_rdd.withColumn("age",col("age").cast(IntegerType())) \
    .withColumn("market_value", col("market_value").cast(DoubleType()))
df_rdd.printSchema()

root
 |-- name: string (nullable = true)
 |-- club: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- position: string (nullable = true)
 |-- market_value: double (nullable = true)
 |-- nationality: string (nullable = true)



### 4. DataFrame Functions

In [17]:
# select columns
df2.select("name","club","age","position","market_value","nationality") \
    .show(5)

+-----------------+-------+---+--------+------------+--------------+
|             name|   club|age|position|market_value|   nationality|
+-----------------+-------+---+--------+------------+--------------+
|   Alexis Sanchez|Arsenal| 28|      LW|          65|         Chile|
|       Mesut Ozil|Arsenal| 28|      AM|          50|       Germany|
|        Petr Cech|Arsenal| 35|      GK|           7|Czech Republic|
|     Theo Walcott|Arsenal| 28|      RW|          20|       England|
|Laurent Koscielny|Arsenal| 31|      CB|          22|        France|
+-----------------+-------+---+--------+------------+--------------+
only showing top 5 rows



#### Filter

In [18]:
# filter rows
df_rdd.filter(df_rdd.club == 'Chelsea') \
        .select("name","club","age","position","market_value","nationality") \
        .show(5,truncate=False)

+---------------------+-------+---+--------+------------+-----------+
|name                 |club   |age|position|market_value|nationality|
+---------------------+-------+---+--------+------------+-----------+
|Eden Hazard          |Chelsea|26 |LW      |75.0        |Belgium    |
|Diego Costa          |Chelsea|28 |CF      |50.0        |Spain      |
|Gary Cahill          |Chelsea|31 |CB      |16.0        |England    |
|Marcos Alonso Mendoza|Chelsea|26 |LB      |25.0        |Spain      |
|Cesar Azpilicueta    |Chelsea|27 |RB      |30.0        |Spain      |
+---------------------+-------+---+--------+------------+-----------+
only showing top 5 rows



In [19]:
# Filter then sort by column
df_rdd.filter(df_rdd.club == 'Chelsea') \
        .select("name","club","age","position","market_value","nationality") \
        .orderBy('market_value') \
        .show(5, truncate=False)

+----------------+-------+---+--------+------------+-----------+
|name            |club   |age|position|market_value|nationality|
+----------------+-------+---+--------+------------+-----------+
|Eduardo Carvalho|Chelsea|34 |LW      |0.05        |Portugal   |
|Willy Caballero |Chelsea|35 |GK      |1.5         |Argentina  |
|Kurt Zouma      |Chelsea|22 |CB      |15.0        |France     |
|Tiemoue Bakayoko|Chelsea|22 |DM      |16.0        |France     |
|Gary Cahill     |Chelsea|31 |CB      |16.0        |England    |
+----------------+-------+---+--------+------------+-----------+
only showing top 5 rows



In [20]:
# expr = col('club') == 'Chelsea'
# expr = "club == 'Chelsea'" 
# expr = "club <> 'Chelsea'" 
# expr = col('club') != 'Chelsea'
# expr = col('club').isin(['Chelsea'])
# expr = col('club').startswith("Chel")
# expr = col('club').endswith("ea")
expr = col('club').like("%ea%")
df_rdd.filter(expr) \
        .select("name","club","age","position","market_value","nationality") \
        .orderBy('market_value') \
        .show(5, truncate=False)

# NOTE: array_contains(col('club'),"xxx") if club column is an array

+----------------+-------+---+--------+------------+-----------+
|name            |club   |age|position|market_value|nationality|
+----------------+-------+---+--------+------------+-----------+
|Eduardo Carvalho|Chelsea|34 |LW      |0.05        |Portugal   |
|Oliver McBurnie |Swansea|21 |CF      |0.25        |Scotland   |
|Jay Fulton      |Swansea|23 |CM      |0.5         |Scotland   |
|Stephen Kingsley|Swansea|22 |LB      |0.75        |Scotland   |
|Ã€ngel Rangel   |Swansea|34 |RB      |1.0         |Spain      |
+----------------+-------+---+--------+------------+-----------+
only showing top 5 rows



In [21]:
# filter NULL
# expr = "club is NULL"
# expr = df_rdd.club.isNull()
# expr = col("club").isNull()
# expr = "club is not NULL"
# expr = "NOT club is NULL"
expr = col("club").isNotNull()

df_rdd.filter(expr)

DataFrame[name: string, club: string, age: int, position: string, market_value: double, nationality: string]

#### Create new column with concat function

In [22]:
# from pyspark.sql.functions import concat

df_newcol = df_rdd.withColumn('age_pos',concat(col('age'), col('position')))
df_newcol.show(5)

+-----------------+-------+---+--------+------------+--------------+-------+
|             name|   club|age|position|market_value|   nationality|age_pos|
+-----------------+-------+---+--------+------------+--------------+-------+
|   Alexis Sanchez|Arsenal| 28|      LW|        65.0|         Chile|   28LW|
|       Mesut Ozil|Arsenal| 28|      AM|        50.0|       Germany|   28AM|
|        Petr Cech|Arsenal| 35|      GK|         7.0|Czech Republic|   35GK|
|     Theo Walcott|Arsenal| 28|      RW|        20.0|       England|   28RW|
|Laurent Koscielny|Arsenal| 31|      CB|        22.0|        France|   31CB|
+-----------------+-------+---+--------+------------+--------------+-------+
only showing top 5 rows



#### Drop a column

In [23]:
df_newcol.drop('age_pos').show(5)

+-----------------+-------+---+--------+------------+--------------+
|             name|   club|age|position|market_value|   nationality|
+-----------------+-------+---+--------+------------+--------------+
|   Alexis Sanchez|Arsenal| 28|      LW|        65.0|         Chile|
|       Mesut Ozil|Arsenal| 28|      AM|        50.0|       Germany|
|        Petr Cech|Arsenal| 35|      GK|         7.0|Czech Republic|
|     Theo Walcott|Arsenal| 28|      RW|        20.0|       England|
|Laurent Koscielny|Arsenal| 31|      CB|        22.0|        France|
+-----------------+-------+---+--------+------------+--------------+
only showing top 5 rows



#### Drop row with NULL value

In [24]:
# df.na.drop().show(truncate=False)
# df.na.drop(how="any").show(truncate=False)
# df.na.drop(subset=["club","nationality"]).show(truncate=False)
# df.dropna().show(truncate=False)

#### Drop duplicate rows

In [25]:
data = [("James", "Sales", 3000), \
    ("Michael", "Sales", 4600), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", 3300), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000), \
    ("Saif", "Sales", 4100) \
  ]
columns= ["employee_name", "department", "salary"]
xdf = spark.createDataFrame(data = data, schema = columns)
xdf.printSchema()
xdf.show(truncate=False)

xdistinctDF = xdf.distinct()
print("Distinct count: "+str(xdistinctDF.count()))
xdistinctDF.show(truncate=False)

xdf2 = xdf.dropDuplicates()
print("Distinct count: "+str(xdf2.count()))
xdf2.show(truncate=False)

xdropDisDF = xdf.dropDuplicates(["department","salary"])
print("Distinct count of department salary : "+str(xdropDisDF.count()))
xdropDisDF.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



                                                                                

Distinct count: 9




+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Jen          |Finance   |3900  |
|Michael      |Sales     |4600  |
|Scott        |Finance   |3300  |
|Kumar        |Marketing |2000  |
|James        |Sales     |3000  |
|Robert       |Sales     |4100  |
|Jeff         |Marketing |3000  |
|Saif         |Sales     |4100  |
|Maria        |Finance   |3000  |
+-------------+----------+------+



                                                                                

Distinct count: 9
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Jen          |Finance   |3900  |
|Michael      |Sales     |4600  |
|Scott        |Finance   |3300  |
|Kumar        |Marketing |2000  |
|James        |Sales     |3000  |
|Robert       |Sales     |4100  |
|Jeff         |Marketing |3000  |
|Saif         |Sales     |4100  |
|Maria        |Finance   |3000  |
+-------------+----------+------+



                                                                                

Distinct count of department salary : 8
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Jen          |Finance   |3900  |
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |
|Kumar        |Marketing |2000  |
|James        |Sales     |3000  |
|Jeff         |Marketing |3000  |
+-------------+----------+------+



#### GROUP BY

In [26]:
# from pyspark.sql.functions import sum

df_newcol.select('club',col('market_value').cast('float')).groupBy('club') \
    .agg(sum('market_value').alias('total_value')) \
    .orderBy(col('total_value').desc()) \
    .show()



+-----------------+------------------+
|             club|       total_value|
+-----------------+------------------+
|          Arsenal|             550.0|
|  Manchester+City|             528.0|
|Manchester+United| 514.1000000014901|
|          Chelsea|486.55000000074506|
|        Tottenham|             460.0|
|        Liverpool|             440.5|
|          Everton|            282.75|
|      Southampton|             230.0|
|   Leicester+City|             207.5|
|         West+Ham|             186.0|
|   Crystal+Palace|            162.25|
|       Stoke+City|             150.0|
|          Swansea|             139.0|
|          Watford|             118.5|
|      Bournemouth|             117.5|
| Newcastle+United|            104.25|
|        West+Brom|            101.25|
|          Burnley|             71.25|
|Brighton+and+Hove|              55.0|
|     Huddersfield| 50.14999997615814|
+-----------------+------------------+





#### expr function 

In [27]:
from pyspark.sql.functions import expr

#Concatenate columns
data=[("James","Bond"),("Scott","Varsa")] 
df=spark.createDataFrame(data).toDF("col1","col2") 
df.withColumn("Name",expr("col1 ||','|| col2")).show()

#Using CASE WHEN sql expression
data = [("James","M"),("Michael","F"),("Jen","")]
columns = ["name","gender"]
df = spark.createDataFrame(data = data, schema = columns)
df2 = df.withColumn("gender", expr("CASE WHEN gender = 'M' THEN 'Male' " +
           "WHEN gender = 'F' THEN 'Female' ELSE 'unknown' END"))
df2.show()

#Add months from a value of another column
data=[("2023-01-23",1),("2023-06-24",2),("2023-09-20",3)] 
df=spark.createDataFrame(data).toDF("date","increment") 

df.select(df.date,df.increment,
     expr("add_months(date,increment)")
  .alias("inc_date")).show()

# Providing alias using 'as'
df.select(df.date,df.increment,
     expr("""add_months(date,increment) as inc_date""")
  ).show()

# Add
df.select(df.date,df.increment,
     expr("increment + 5 as new_increment")
  ).show()

df.select("increment",expr("cast(increment as string) as str_increment")).printSchema()

#Use expr()  to filter the rows
data=[(100,2),(200,3000),(500,500)] 
df=spark.createDataFrame(data).toDF("col1","col2") 
df.filter(expr("col1 == col2")).show()

+-----+-----+-----------+
| col1| col2|       Name|
+-----+-----+-----------+
|James| Bond| James,Bond|
|Scott|Varsa|Scott,Varsa|
+-----+-----+-----------+

+-------+-------+
|   name| gender|
+-------+-------+
|  James|   Male|
|Michael| Female|
|    Jen|unknown|
+-------+-------+

+----------+---------+----------+
|      date|increment|  inc_date|
+----------+---------+----------+
|2023-01-23|        1|2023-02-23|
|2023-06-24|        2|2023-08-24|
|2023-09-20|        3|2023-12-20|
+----------+---------+----------+

+----------+---------+----------+
|      date|increment|  inc_date|
+----------+---------+----------+
|2023-01-23|        1|2023-02-23|
|2023-06-24|        2|2023-08-24|
|2023-09-20|        3|2023-12-20|
+----------+---------+----------+

+----------+---------+-------------+
|      date|increment|new_increment|
+----------+---------+-------------+
|2023-01-23|        1|            6|
|2023-06-24|        2|            7|
|2023-09-20|        3|            8|
+----------+----

#### Sort function

In [28]:
df.sort(col('col2').desc()).show()

+----+----+
|col1|col2|
+----+----+
| 200|3000|
| 500| 500|
| 100|   2|
+----+----+



#### PIVOT TABLE

In [29]:
# from pyspark.sql.functions import round

pivotDF = df_newcol.select('club',col('market_value').cast('float'), 'position') \
        .groupBy("club").pivot("position").agg(round(sum("market_value"),2))
# pivotDF.printSchema()
pivotDF.show(truncate=False)

# pivotDF = df_newcol.select('club',col('market_value').cast('float'), 'position') \
#         .groupBy("club","position") \
#       .sum("market_value") \
#       .groupBy("club") \
#       .pivot("position") \
#       .sum("sum(market_value)")
# pivotDF.printSchema()
# pivotDF.show(truncate=False)

                                                                                

+-----------------+----+-----+-----+-----+-----+----+----+-----+-----+-----+----+-----+----+
|club             |AM  |CB   |CF   |CM   |DM   |GK  |LB  |LM   |LW   |RB   |RM  |RW   |SS  |
+-----------------+----+-----+-----+-----+-----+----+----+-----+-----+-----+----+-----+----+
|Tottenham        |40.0|70.0 |73.0 |87.0 |50.0 |28.0|40.0|null |37.0 |10.0 |null|25.0 |null|
|Brighton+and+Hove|null|12.25|9.0  |13.75|null |3.75|3.5 |null |3.25 |1.0  |0.5 |8.0  |null|
|West+Ham         |15.0|28.0 |21.0 |21.0 |12.0 |2.5 |19.0|null |18.0 |11.5 |null|38.0 |null|
|Leicester+City   |null|18.5 |70.0 |12.0 |38.0 |10.5|8.5 |6.0  |9.0  |5.0  |null|30.0 |null|
|Arsenal          |50.0|78.0 |92.0 |65.0 |60.0 |15.0|38.0|null |75.0 |35.0 |22.0|20.0 |null|
|Manchester+United|65.0|103.0|98.0 |117.0|4.0  |46.1|42.0|5.0  |12.0 |22.0 |null|null |null|
|West+Brom        |null|9.0  |25.0 |5.25 |11.0 |5.5 |null|4.0  |20.5 |10.5 |null|10.5 |null|
|Burnley          |null|8.5  |17.0 |13.0 |9.0  |4.0 |1.5 |13.25|null |

#### JOIN DataFrame

In [30]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.show(truncate=False)


dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.show(truncate=False)
  
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer") \
    .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full") \
    .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter") \
    .show(truncate=False)
    
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left") \
    .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftouter") \
   .show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right") \
   .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"rightouter") \
   .show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi") \
   .show(truncate=False)
   
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti") \
   .show(truncate=False)
   
empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_em



+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|n



+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|n

#### Union 2 DataFrame

In [31]:
df_total = df_newcol.select('club',col('market_value').cast('float')).groupBy('club') \
    .agg(sum('market_value').alias('total_value')) \
    .orderBy(col('total_value').desc())

df_big6 = df_total.filter("total_value > 300")
df_big6.show()

df_big_rest = df_total.filter("total_value <= 300")
df_big_rest.show()

+-----------------+------------------+
|             club|       total_value|
+-----------------+------------------+
|          Arsenal|             550.0|
|  Manchester+City|             528.0|
|Manchester+United| 514.1000000014901|
|          Chelsea|486.55000000074506|
|        Tottenham|             460.0|
|        Liverpool|             440.5|
+-----------------+------------------+

+-----------------+-----------------+
|             club|      total_value|
+-----------------+-----------------+
|          Everton|           282.75|
|      Southampton|            230.0|
|   Leicester+City|            207.5|
|         West+Ham|            186.0|
|   Crystal+Palace|           162.25|
|       Stoke+City|            150.0|
|          Swansea|            139.0|
|          Watford|            118.5|
|      Bournemouth|            117.5|
| Newcastle+United|           104.25|
|        West+Brom|           101.25|
|          Burnley|            71.25|
|Brighton+and+Hove|             55.0|
|

In [32]:
df_big6.union(df_big_rest).show()



+-----------------+------------------+
|             club|       total_value|
+-----------------+------------------+
|          Arsenal|             550.0|
|  Manchester+City|             528.0|
|Manchester+United| 514.1000000014901|
|          Chelsea|486.55000000074506|
|        Tottenham|             460.0|
|        Liverpool|             440.5|
|          Everton|            282.75|
|      Southampton|             230.0|
|   Leicester+City|             207.5|
|         West+Ham|             186.0|
|   Crystal+Palace|            162.25|
|       Stoke+City|             150.0|
|          Swansea|             139.0|
|          Watford|             118.5|
|      Bournemouth|             117.5|
| Newcastle+United|            104.25|
|        West+Brom|            101.25|
|          Burnley|             71.25|
|Brighton+and+Hove|              55.0|
|     Huddersfield| 50.14999997615814|
+-----------------+------------------+





#### WHEN function

In [33]:
# from pyspark.sql.functions import when

data = [("James","M"),("Michael","F"),("Jen","")]
df = spark.createDataFrame(data = data, schema = ["name","gender"])
df2 = df.withColumn("gender", when(col('gender') == 'M','Male')
                              .when(col('gender') == 'F','Female') 
                            .otherwise('unknown'))
df2.show()

+-------+-------+
|   name| gender|
+-------+-------+
|  James|   Male|
|Michael| Female|
|    Jen|unknown|
+-------+-------+



#### Statistic functions

In [34]:
# from pyspark.sql.functions import collect_set,sum,avg,max,countDistinct,count,collect_list
# from pyspark.sql.functions import first, last, kurtosis, min, mean, skewness 
# from pyspark.sql.functions import variance, stddev, stddev_samp, stddev_pop, sumDistinct, var_samp, var_pop

In [35]:
df_total.select(collect_list("total_value")).show(truncate=False)
df_total.select(collect_set("total_value")).show(truncate=False)
df_total.select(countDistinct("club", "total_value")).show(truncate=False)
df_total.select(first("total_value")).show(truncate=False)
df_total.select(last("total_value")).show(truncate=False)
df_total.select(kurtosis("total_value")).show(truncate=False)
df_total.select(max("total_value")).show(truncate=False)
df_total.select(min("total_value")).show(truncate=False)
df_total.select(mean("total_value")).show(truncate=False)
df_total.select(skewness("total_value")).show(truncate=False)
df_total.select(stddev("total_value"), stddev_samp("total_value"),stddev_pop("total_value")).show(truncate=False)
df_total.select(sum("total_value")).show(truncate=False)
df_total.select(sumDistinct("total_value")).show(truncate=False)
df_total.select(variance("total_value"),var_samp("total_value"),var_pop("total_value")).show(truncate=False)

                                                                                

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|collect_list(total_value)                                                                                                                                                           |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[550.0, 528.0, 514.1000000014901, 486.55000000074506, 460.0, 440.5, 282.75, 230.0, 207.5, 186.0, 162.25, 150.0, 139.0, 118.5, 117.5, 104.25, 101.25, 71.25, 55.0, 50.14999997615814]|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+





+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|collect_set(total_value)                                                                                                                                                            |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[230.0, 207.5, 440.5, 71.25, 139.0, 55.0, 550.0, 282.75, 186.0, 50.14999997615814, 514.1000000014901, 486.55000000074506, 117.5, 528.0, 104.25, 150.0, 118.5, 101.25, 162.25, 460.0]|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



                                                                                

+---------------------------------+
|count(DISTINCT club, total_value)|
+---------------------------------+
|20                               |
+---------------------------------+



                                                                                

+------------------+
|first(total_value)|
+------------------+
|550.0             |
+------------------+



                                                                                

+-----------------+
|last(total_value)|
+-----------------+
|50.14999997615814|
+-----------------+

+---------------------+
|kurtosis(total_value)|
+---------------------+
|-1.2237757194547998  |
+---------------------+





+----------------+
|max(total_value)|
+----------------+
|550.0           |
+----------------+





+-----------------+
|min(total_value) |
+-----------------+
|50.14999997615814|
+-----------------+



                                                                                

+------------------+
|avg(total_value)  |
+------------------+
|247.72749999891965|
+------------------+

+---------------------+
|skewness(total_value)|
+---------------------+
|0.6268312950071274   |
+---------------------+

+------------------------+------------------------+-----------------------+
|stddev_samp(total_value)|stddev_samp(total_value)|stddev_pop(total_value)|
+------------------------+------------------------+-----------------------+
|177.52779802871657      |177.52779802871657      |173.03269378726827     |
+------------------------+------------------------+-----------------------+

+-----------------+
|sum(total_value) |
+-----------------+
|4954.549999978393|
+-----------------+





+-------------------------+
|sum(DISTINCT total_value)|
+-------------------------+
|4954.549999978393        |
+-------------------------+

+---------------------+---------------------+--------------------+
|var_samp(total_value)|var_samp(total_value)|var_pop(total_value)|
+---------------------+---------------------+--------------------+
|31516.119072924786   |31516.119072924786   |29940.313119278544  |
+---------------------+---------------------+--------------------+



#### CREATE VIEW

In [36]:
df_rdd.createOrReplaceTempView("epldata")
spark.sql("SELECT * from epldata").show(5)

+-----------------+-------+---+--------+------------+--------------+
|             name|   club|age|position|market_value|   nationality|
+-----------------+-------+---+--------+------------+--------------+
|   Alexis Sanchez|Arsenal| 28|      LW|        65.0|         Chile|
|       Mesut Ozil|Arsenal| 28|      AM|        50.0|       Germany|
|        Petr Cech|Arsenal| 35|      GK|         7.0|Czech Republic|
|     Theo Walcott|Arsenal| 28|      RW|        20.0|       England|
|Laurent Koscielny|Arsenal| 31|      CB|        22.0|        France|
+-----------------+-------+---+--------+------------+--------------+
only showing top 5 rows



In [37]:
spark.sql("SELECT club, AVG(age) AS avg_age, AVG(market_value) AS avg_marketvalue FROM epldata GROUP BY club").show()

+-----------------+------------------+------------------+
|             club|           avg_age|   avg_marketvalue|
+-----------------+------------------+------------------+
|        Tottenham|             25.65|              23.0|
|Brighton+and+Hove|27.952380952380953| 2.619047619047619|
|         West+Ham|27.238095238095237| 8.857142857142858|
|   Leicester+City|             27.25| 8.645833333333334|
|          Arsenal|26.678571428571427|19.642857142857142|
|Manchester+United|             25.56|            20.564|
|        West+Brom|28.210526315789473| 5.328947368421052|
|          Burnley|27.944444444444443|3.9583333333333335|
|      Bournemouth|            26.875| 4.895833333333333|
| Newcastle+United|              26.4|            5.2125|
|      Southampton|24.956521739130434|              10.0|
|          Swansea|              27.0|              5.56|
|     Huddersfield|              26.0|1.7910714285714284|
|   Crystal+Palace|28.047619047619047| 7.726190476190476|
|        Liver

#### DateTime function

In [38]:
data = [("1","2019-07-01"),("2","2019-06-24"),("3","2019-08-24")]

df=spark.createDataFrame(data=data,schema=["id","date"])

df.select(
      col("date"),
      current_date().alias("current_date"),
      datediff(current_date(),col("date")).alias("datediff")
    ).show()

df.withColumn("datesDiff", datediff(current_date(),col("date"))) \
  .withColumn("monthsDiff", months_between(current_date(),col("date"))) \
  .withColumn("monthsDiff_round",round(months_between(current_date(),col("date")),2)) \
  .withColumn("yearsDiff",months_between(current_date(),col("date"))/lit(12)) \
  .withColumn("yearsDiff_round",round(months_between(current_date(),col("date"))/lit(12),2)) \
  .show()

data2 = [("1","07-01-2019"),("2","06-24-2019"),("3","08-24-2019")]  
df2=spark.createDataFrame(data=data2,schema=["id","date"])
df2.select(
    to_date(col("date"),"MM-dd-yyyy").alias("date"),
    current_date().alias("endDate")
    )

#SQL

spark.sql("select round(months_between(current_date(), '2019-07-01')/12,2) as years_diff").show()

+----------+------------+--------+
|      date|current_date|datediff|
+----------+------------+--------+
|2019-07-01|  2023-04-13|    1382|
|2019-06-24|  2023-04-13|    1389|
|2019-08-24|  2023-04-13|    1328|
+----------+------------+--------+

+---+----------+---------+-----------+----------------+------------------+---------------+
| id|      date|datesDiff| monthsDiff|monthsDiff_round|         yearsDiff|yearsDiff_round|
+---+----------+---------+-----------+----------------+------------------+---------------+
|  1|2019-07-01|     1382|45.38709677|           45.39|3.7822580641666668|           3.78|
|  2|2019-06-24|     1389|45.64516129|           45.65| 3.803763440833333|            3.8|
|  3|2019-08-24|     1328|43.64516129|           43.65|3.6370967741666664|           3.64|
+---+----------+---------+-----------+----------------+------------------+---------------+

+----------+
|years_diff|
+----------+
|      3.78|
+----------+



In [39]:
data=[["1","02-01-2020 11 01 19 06"],["2","03-01-2019 12 01 19 406"],["3","03-01-2021 12 01 19 406"]]
df2=spark.createDataFrame(data,["id","input"])
df2.show(truncate=False)

#current_timestamp()
df2.select(current_timestamp().alias("current_timestamp")).show(1,truncate=False)

#to_timestamp()
df2.select(col("input"), 
    to_timestamp(col("input"), "MM-dd-yyyy HH mm ss SSS").alias("to_timestamp") 
  ).show(truncate=False)


#hour, minute,second
data=[["1","2020-02-01 11:01:19.06"],["2","2019-03-01 12:01:19.406"],["3","2021-03-01 12:01:19.406"]]
df3=spark.createDataFrame(data,["id","input"])

df3.select(col("input"), 
    hour(col("input")).alias("hour"), 
    minute(col("input")).alias("minute"),
    second(col("input")).alias("second") 
  ).show(truncate=False)

+---+-----------------------+
|id |input                  |
+---+-----------------------+
|1  |02-01-2020 11 01 19 06 |
|2  |03-01-2019 12 01 19 406|
|3  |03-01-2021 12 01 19 406|
+---+-----------------------+

+-----------------------+
|current_timestamp      |
+-----------------------+
|2023-04-13 22:38:25.702|
+-----------------------+
only showing top 1 row

+-----------------------+-----------------------+
|input                  |to_timestamp           |
+-----------------------+-----------------------+
|02-01-2020 11 01 19 06 |2020-02-01 11:01:19.06 |
|03-01-2019 12 01 19 406|2019-03-01 12:01:19.406|
|03-01-2021 12 01 19 406|2021-03-01 12:01:19.406|
+-----------------------+-----------------------+

+-----------------------+----+------+------+
|input                  |hour|minute|second|
+-----------------------+----+------+------+
|2020-02-01 11:01:19.06 |11  |1     |19    |
|2019-03-01 12:01:19.406|12  |1     |19    |
|2021-03-01 12:01:19.406|12  |1     |19    |
+-------------

### 5. READ FROM MYSQL

##### read table

In [42]:
from pyspark import SQLContext

sqlContext = SQLContext(spark.sparkContext)

df = sqlContext \
      .read \
      .format("jdbc") \
      .option("url", "jdbc:mysql://localhost:3306/classicmodels") \
      .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("user","root") \
    .option("password", "QA2015!@#") \
      .option("dbtable", "customers") \
      .load()

# Looks the schema of this DataFrame.
df.printSchema()

df.show(5)

root
 |-- customerNumber: integer (nullable = true)
 |-- customerName: string (nullable = true)
 |-- contactLastName: string (nullable = true)
 |-- contactFirstName: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- addressLine1: string (nullable = true)
 |-- addressLine2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postalCode: string (nullable = true)
 |-- country: string (nullable = true)
 |-- salesRepEmployeeNumber: integer (nullable = true)
 |-- creditLimit: decimal(10,2) (nullable = true)

+--------------+--------------------+---------------+----------------+------------+--------------------+------------+---------+--------+----------+---------+----------------------+-----------+
|customerNumber|        customerName|contactLastName|contactFirstName|       phone|        addressLine1|addressLine2|     city|   state|postalCode|  country|salesRepEmployeeNumber|creditLimit|
+--------------+--------------------+-

##### query

In [41]:
df = sqlContext \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/classicmodels") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("user","root") \
    .option("password", "QA2015!@#") \
    .option("query", "SELECT customerNumber, customerName, salesRepEmployeeNumber FROM customers") \
    .load()

# Looks the schema of this DataFrame.
df.printSchema()

df.show(5)

root
 |-- customerNumber: integer (nullable = true)
 |-- customerName: string (nullable = true)
 |-- salesRepEmployeeNumber: integer (nullable = true)

+--------------+--------------------+----------------------+
|customerNumber|        customerName|salesRepEmployeeNumber|
+--------------+--------------------+----------------------+
|           103|   Atelier graphique|                  1370|
|           112|  Signal Gift Stores|                  1166|
|           114|Australian Collec...|                  1611|
|           119|   La Rochelle Gifts|                  1370|
|           121|  Baane Mini Imports|                  1504|
+--------------+--------------------+----------------------+
only showing top 5 rows



##### Save to MySQL

In [None]:
# df.write
#       .format("jdbc") \
#     .option("url", "jdbc:mysql://localhost:3306/classicmodels") \
#       .option("driver", "com.mysql.cj.jdbc.Driver") \
#     .option("user","root") \
#     .option("password", "QA2015!@#") \
#       .option("dbtable", "customers") \
#   .save()