In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import spark_partition_id,col
import util.config as conf
from util.logger import Log4j
from pyspark.sql import functions as f


In [2]:
spark_conf = conf.get_spark_conf()
spark = SparkSession.builder \
        .config(conf=spark_conf) \
        .getOrCreate()
flight_time_df = spark.read.parquet("/data/source-and-sink/flight-time.parquet")
log = Log4j(spark)
log.info(f"Num Partitions before: {flight_time_df.rdd.getNumPartitions()}")

util_dir: /spark/08-source-and-sink/util


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/30 16:13:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
flight_time_df.rdd.getNumPartitions()

2

In [10]:
fligh_time_df = flight_time_df \
.withColumn('YEAR',f.year(f.to_date(col('FL_DATE'),'YYYY-MM-DD'))) 
fligh_time_df.printSchema()

root
 |-- FL_DATE: date (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: integer (nullable = true)
 |-- CANCELLED: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- YEAR: integer (nullable = true)



In [16]:
res_df = flight_time_df \
    .filter((col('CANCELLED') == 1) &  # So sánh với số nguyên
            (col('DEST_CITY_NAME') == "Atlanta, GA")&
           ((f.year(f.to_date(col('FL_DATE'),'YYYY-MM-DD')))==2000)) \
    .select('DEST', 'DEST_CITY_NAME', 'FL_DATE', 'ORIGIN', 'ORIGIN_CITY_NAME', 'CANCELLED') \
    .orderBy(col('FL_DATE').desc())
res_df.show()
               

+----+--------------+----------+------+--------------------+---------+
|DEST|DEST_CITY_NAME|   FL_DATE|ORIGIN|    ORIGIN_CITY_NAME|CANCELLED|
+----+--------------+----------+------+--------------------+---------+
| ATL|   Atlanta, GA|2000-01-31|   GSO|Greensboro/High P...|        1|
| ATL|   Atlanta, GA|2000-01-31|   MIA|           Miami, FL|        1|
| ATL|   Atlanta, GA|2000-01-31|   GSP|           Greer, SC|        1|
| ATL|   Atlanta, GA|2000-01-31|   IAD|      Washington, DC|        1|
| ATL|   Atlanta, GA|2000-01-31|   IAD|      Washington, DC|        1|
| ATL|   Atlanta, GA|2000-01-31|   IAD|      Washington, DC|        1|
| ATL|   Atlanta, GA|2000-01-31|   LGA|        New York, NY|        1|
| ATL|   Atlanta, GA|2000-01-31|   LGA|        New York, NY|        1|
| ATL|   Atlanta, GA|2000-01-31|   RDU|  Raleigh/Durham, NC|        1|
| ATL|   Atlanta, GA|2000-01-31|   RIC|        Richmond, VA|        1|
| ATL|   Atlanta, GA|2000-01-31|   RIC|        Richmond, VA|        1|
| ATL|

In [18]:
agg_df = flight_time_df\
        .filter(col('CANCELLED') ==1)\
        .groupBy(col('OP_CARRIER'),col('ORIGIN')) \
        .agg(
            f.count('*').alias('NUM_CANCELLED_FLIGHT')
        ) \
        .orderBy(col('OP_CARRIER'),col('ORIGIN'))
agg_df.show()

+----------+------+--------------------+
|OP_CARRIER|ORIGIN|NUM_CANCELLED_FLIGHT|
+----------+------+--------------------+
|        AA|   ABQ|                   4|
|        AA|   ALB|                   6|
|        AA|   AMA|                   2|
|        AA|   ATL|                  30|
|        AA|   AUS|                  25|
|        AA|   BDL|                  33|
|        AA|   BHM|                   4|
|        AA|   BNA|                  19|
|        AA|   BOS|                 198|
|        AA|   BUF|                   3|
|        AA|   BUR|                   1|
|        AA|   BWI|                  15|
|        AA|   CLE|                   9|
|        AA|   CLT|                  18|
|        AA|   CMH|                   4|
|        AA|   COS|                   8|
|        AA|   CVG|                   5|
|        AA|   DAY|                   1|
|        AA|   DCA|                 112|
|        AA|   DEN|                  18|
+----------+------+--------------------+
only showing top