In [35]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
                    .appName('helloSpark')
                    .getOrCreate()
        )

Further info on Spark sessions:  
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/spark_session.html

In [36]:
spark

In [37]:
# Let's generate some data for analysis

import random

names = ["Alice", "Ben", "Charles", "Daisy"]
start_range = 900
end_range = 5000
python_data = [[random.choice(names),random.randint(start_range,end_range)] for i in range(500000)]

In [38]:
# To read in a Python object (list, dict), we can use spark.createDataFrame
# We define a schema to have nicer column names and avoid Spark having to infer the schema
schema = "name STRING, salary INT"

df = spark.createDataFrame(python_data,schema=schema)

In [39]:
# to display some rows, you can use .show() 
df.show()

+-------+------+
|   name|salary|
+-------+------+
|    Ben|  3216|
|  Alice|  1920|
|  Alice|  3300|
|    Ben|  4468|
|  Alice|  1094|
|Charles|  3456|
|    Ben|  4487|
|  Daisy|  1744|
|Charles|  4603|
|  Daisy|  1143|
|Charles|  4781|
|    Ben|  4084|
|  Alice|  2825|
|    Ben|  4284|
|  Daisy|  3168|
|Charles|  1723|
|Charles|  1252|
|  Alice|  4753|
|  Daisy|  1271|
|  Alice|  1840|
+-------+------+
only showing top 20 rows



In [40]:
df_new = (df.groupBy("name")
          .avg("salary")
         )

In [41]:
df_new.show()

+-------+------------------+
|   name|       avg(salary)|
+-------+------------------+
|Charles|2949.6538070415654|
|    Ben| 2951.933745717701|
|  Alice| 2945.608111883646|
|  Daisy|  2948.88866524493|
+-------+------------------+



In [42]:
# Many of the functions hide behind spark.sql.functions
import pyspark.sql.functions as F

(df_new.select(
    "name"
    ,"avg(salary)"
    ,F.round("avg(salary)").alias("average")
    ).show()
)

+-------+------------------+-------+
|   name|       avg(salary)|average|
+-------+------------------+-------+
|Charles|2949.6538070415654| 2950.0|
|    Ben| 2951.933745717701| 2952.0|
|  Alice| 2945.608111883646| 2946.0|
|  Daisy|  2948.88866524493| 2949.0|
+-------+------------------+-------+



The following is for a comparison with the popular Python package `pandas`

In [43]:
import pandas as pd 

pd_df = pd.DataFrame(python_data,columns=["name","salary"])

In [44]:
pd_df

Unnamed: 0,name,salary
0,Ben,3216
1,Alice,1920
2,Alice,3300
3,Ben,4468
4,Alice,1094
...,...,...
499995,Daisy,2495
499996,Daisy,1287
499997,Charles,3418
499998,Alice,4571


In [45]:
pd_df.groupby("name").mean("salary")

Unnamed: 0_level_0,salary
name,Unnamed: 1_level_1
Alice,2945.608112
Ben,2951.933746
Charles,2949.653807
Daisy,2948.888665


Which of these seemed to be faster?  
Why?

Let's have a quick walkthrough of a few more PySpark methods.  
For a longer (full) list of methods, see:  
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html

In [46]:
# load CSV files into DataFrames
employees_df = spark.read.option("header", "true").csv("input/employees.csv")
departments_df = spark.read.option("header", "true").csv("input/departments.csv")

employees_df.show()
departments_df.show()

+------+-----+----------+------+
|emp_id| name|department|salary|
+------+-----+----------+------+
|     1| John|        HR| 50000|
|     2|Alice|        IT| 60000|
|     3|  Bob|   Finance| 55000|
+------+-----+----------+------+

+----------+-------+
|department|manager|
+----------+-------+
|        HR|   Anna|
|        IT|  David|
|   Finance|  Sarah|
+----------+-------+



In [47]:

# convert salary from string to integer for proper filtering
employees_df = employees_df.withColumn("salary", F.col("salary").cast("integer"))

employees_df.show()

+------+-----+----------+------+
|emp_id| name|department|salary|
+------+-----+----------+------+
|     1| John|        HR| 50000|
|     2|Alice|        IT| 60000|
|     3|  Bob|   Finance| 55000|
+------+-----+----------+------+



In [None]:

# use .filter() for ... filtering.
filtered_df = employees_df.filter(F.col("salary") > 55000)

# use .select() for ... selecting (columns).
selected_df = filtered_df.select("emp_id", "name", "department", "salary")

# use withColumnRenamed for renaming columns
renamed_df = selected_df.withColumnRenamed("emp_id", "employee_id")

# use selectExpr() for projecting SQL expressions
expr_df = renamed_df.selectExpr("employee_id", "name", "department", "salary", "salary / 12 as monthly_salary")

# use .join() for ... joining. Let's join with departments DataFrame on the 'department' column
joined_df = expr_df.join(departments_df, on="department", how="inner")


In [None]:

# .write for writing. There are multiple more options you can see in the next classes.
joined_df.write.mode("overwrite").option("header", "true").csv("output/joined_employees")

# NB - Spark has lazy evaluation. It will only execute the code when it needs to.
# This means that you can chain multiple transformations and actions together without any performance hit.
# The code will only be executed when you call an action like .show() or .write().

In [57]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round

# Initialize Spark Session
spark = SparkSession.builder.appName("ThailandTourismAnalysis").getOrCreate()

# Read Parquet File
df = spark.read.parquet("thailand_domestic_tourism_2019_2023_ver2.parquet")

# Let's view the data
df.show()


+-------------------+---------------+--------------------+-----------+----------+------------------+-----+
|               date|  province_thai|        province_eng|region_thai|region_eng|          variable|value|
+-------------------+---------------+--------------------+-----------+----------+------------------+-----+
|2019-01-01 00:00:00|  กรุงเทพมหานคร|             Bangkok|    ภาคกลาง|   central|ratio_tourist_stay|93.37|
|2019-01-01 00:00:00|         ลพบุรี|            Lopburi |    ภาคกลาง|   central|ratio_tourist_stay|61.32|
|2019-01-01 00:00:00|พระนครศรีอยุธยา|Phra Nakhon Si Ay...|    ภาคกลาง|   central|ratio_tourist_stay|73.37|
|2019-01-01 00:00:00|        สระบุรี|           Saraburi |    ภาคกลาง|   central|ratio_tourist_stay|67.33|
|2019-01-01 00:00:00|         ชัยนาท|            Chainat |    ภาคกลาง|   central|ratio_tourist_stay|79.31|
|2019-01-01 00:00:00|         นครปฐม|      Nakhon Pathom |    ภาคกลาง|   central|ratio_tourist_stay| 71.7|
|2019-01-01 00:00:00|      สิงห์บุรี|

In [58]:

# Display Schema
df.printSchema()

# Show Sample Data
df.show(5)

# Exploratory Data Analysis
print(f"Total records: {df.count()}")
df.describe().show()

# Assuming the correct columns are 'variable' and 'value'
# Pivot the DataFrame to get the required columns
pivot_df = df.groupBy("date", "province_eng").pivot("variable").sum("value")

# Calculate Required Aggregations
aggregated_df = pivot_df.withColumn(
    "no_percentage_of_foreign_tourists",
    round(col("no_tourist_foreign") / col("no_tourist_all"), 4)
).withColumn(
    "revenue_percentage_of_foreign_tourists",
    round(col("revenue_foreign") / col("revenue_all"), 4)
).select(
    "date", "province_eng", "no_percentage_of_foreign_tourists", "revenue_percentage_of_foreign_tourists"
)

# Show Aggregated Data
aggregated_df.show(5)

# Write to JSON
aggregated_df.write.mode("overwrite").json("output/tourism_aggregated.json")

# Stop Spark Session
spark.stop()



root
 |-- date: timestamp_ntz (nullable = true)
 |-- province_thai: string (nullable = true)
 |-- province_eng: string (nullable = true)
 |-- region_thai: string (nullable = true)
 |-- region_eng: string (nullable = true)
 |-- variable: string (nullable = true)
 |-- value: double (nullable = true)

+-------------------+---------------+--------------------+-----------+----------+------------------+-----+
|               date|  province_thai|        province_eng|region_thai|region_eng|          variable|value|
+-------------------+---------------+--------------------+-----------+----------+------------------+-----+
|2019-01-01 00:00:00|  กรุงเทพมหานคร|             Bangkok|    ภาคกลาง|   central|ratio_tourist_stay|93.37|
|2019-01-01 00:00:00|         ลพบุรี|            Lopburi |    ภาคกลาง|   central|ratio_tourist_stay|61.32|
|2019-01-01 00:00:00|พระนครศรีอยุธยา|Phra Nakhon Si Ay...|    ภาคกลาง|   central|ratio_tourist_stay|73.37|
|2019-01-01 00:00:00|        สระบุรี|           Saraburi |