In [None]:
import os
import findspark
import sys

os.environ["JAVA_HOME"] = "D:\\anmv2\\Environment\\Jdk1.8"
os.environ["SPARK_HOME"] = "D:\\anmv2\\Environment\\Spark\\spark-3.5.3-bin-hadoop3"
os.environ["HADOOP_HOME"] = "D:\\anmv2\\Environment\\Hadoop\\hadoop"

findspark.init()

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType

# Tạo SparkConf và cấu hình các tham số
conf = SparkConf() \
    .setMaster('local') \
    .setAppName('Lab3 Source and Sink') 

# Tạo SparkSession từ SparkConf
spark = SparkSession.builder.config(conf=conf).getOrCreate()

spark.sparkContext.setLogLevel("DEBUG")

# Lấy SparkContext từ SparkSession
sc = spark.sparkContext

In [None]:
DATA_IN = 'data_lab3.1_source_and_sink.csv'

In [None]:
# Method 1
# Khởi tạo Schema bằng StructType
schema = StructType([
    StructField("FL_DATE", DateType(), True),
    StructField("OP_CARRIER", StringType(), True),
    StructField("OP_CARRIER_FL_NUM", IntegerType(), True),
    StructField("ORIGIN", StringType(), True),
    StructField("ORIGIN_CITY_NAME", StringType(), True),
    StructField("DEST", StringType(), True),
    StructField("DEST_CITY_NAME", StringType(), True),
    StructField("CRS_DEP_TIME", IntegerType(), True),
    StructField("DEP_TIME", IntegerType(), True),
    StructField("WHEELS_ON", IntegerType(), True),
    StructField("TAXI_IN", IntegerType(), True),
    StructField("CRS_ARR_TIME", IntegerType(), True),
    StructField("ARR_TIME", IntegerType(), True),
    StructField("CANCELLED", IntegerType(), True),
    StructField("DISTANCE", IntegerType(), True),
])

df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .option("mode", "FAILFAST") \
    .option("dateFormat", "yyyy-MM-dd") \
    .load(DATA_IN)

print("Schema create by Struct")
df.show(5)

In [None]:
# Method 2
# Khởi tạo Schema bằng DDL
str_schema = """
    FL_DATE DATE,
    OP_CARRIER STRING,
    OP_CARRIER_FL_NUM INT,
    ORIGIN STRING,
    ORIGIN_CITY_NAME STRING,
    DEST STRING,
    DEST_CITY_NAME STRING,
    CRS_DEP_TIME INT,
    DEP_TIME INT,
    WHEELS_ON INT,
    TAXI_IN INT,
    CRS_ARR_TIME INT,
    ARR_TIME INT,
    CANCELLED INT,
    DISTANCE INT
"""

print("Schema create by DDL string")
df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .schema(str_schema) \
    .option("mode", "FAILFAST") \
    .option("dateFormat", "yyyy-MM-dd") \
    .load(DATA_IN)


df.show(5)

In [None]:
# Thực hiện check partitions hiện tại
from pyspark.sql.functions import spark_partition_id

df.groupBy(spark_partition_id()).count().show()
print("Num Partitions before: " + str(df.rdd.getNumPartitions()))

In [None]:
# Thực hiện repartition thành 5 

partitionedDF = df.repartition(5)
print("Num Partitions after: " + str(partitionedDF.rdd.getNumPartitions()))
partitionedDF.groupBy(spark_partition_id()).count().show()

In [None]:
# Write df sau khi partition:
partitionedDF.write \
    .format("json") \
    .mode("overwrite") \
    .option("path", "lab3_output.json") \
    .save()