In [110]:
import os
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType
from pyspark.sql.functions import split, count, when, isnan, col, regexp_replace

In [84]:
spark = SparkSession.builder.appName('Airlines Delay').getOrCreate()
spark.sparkContext.setLogLevel("WARN")

In [85]:
"""
create table ontime (
  Year int,
  Month int,
  DayofMonth int,
  DayOfWeek int,
  DepTime  int,
  CRSDepTime int,
  ArrTime int,
  CRSArrTime int,
  UniqueCarrier varchar(5),
  FlightNum int,
  TailNum varchar(8),
  ActualElapsedTime int,
  CRSElapsedTime int,
  AirTime int,
  ArrDelay int,
  DepDelay int,
  Origin varchar(3),
  Dest varchar(3),
  Distance int,
  TaxiIn int,
  TaxiOut int,
  Cancelled int,
  CancellationCode varchar(1),
  Diverted varchar(1),
  CarrierDelay int,
  WeatherDelay int,
  NASDelay int,
  SecurityDelay int,
  LateAircraftDelay int
);

"""

'\ncreate table ontime (\n  Year int,\n  Month int,\n  DayofMonth int,\n  DayOfWeek int,\n  DepTime  int,\n  CRSDepTime int,\n  ArrTime int,\n  CRSArrTime int,\n  UniqueCarrier varchar(5),\n  FlightNum int,\n  TailNum varchar(8),\n  ActualElapsedTime int,\n  CRSElapsedTime int,\n  AirTime int,\n  ArrDelay int,\n  DepDelay int,\n  Origin varchar(3),\n  Dest varchar(3),\n  Distance int,\n  TaxiIn int,\n  TaxiOut int,\n  Cancelled int,\n  CancellationCode varchar(1),\n  Diverted varchar(1),\n  CarrierDelay int,\n  WeatherDelay int,\n  NASDelay int,\n  SecurityDelay int,\n  LateAircraftDelay int\n);\n\n'

In [86]:
schema = StructType([StructField('',IntegerType(),nullable=True),
                   StructField('Year',IntegerType(),nullable=True),
                   StructField('Month',IntegerType(),nullable=True),
                   StructField('DayofMonth',IntegerType(),nullable=True),
                   StructField('DayofWeek',IntegerType(),nullable=True),
                   StructField('DepTime',FloatType(),nullable=True),
                   StructField('CRSDepTime',IntegerType(),nullable=True),
                   StructField('ArrTime',FloatType(),nullable=True),
                   StructField('CRSArrTime',IntegerType(),nullable=True),
                   StructField('UniqueCarrier',StringType(),nullable=True),
                   StructField('FlightNum',IntegerType(),nullable=True),
                   StructField('TailNum',StringType(),nullable=True),
                   StructField('ActualElapsedTime',FloatType(),nullable=True),
                   StructField('CRSElapsedTime',FloatType(),nullable=True),
                   StructField('AirTime',FloatType(),nullable=True),
                   StructField('ArrDelay',FloatType(),nullable=True),
                   StructField('DepDelay',FloatType(),nullable=True),
                   StructField('Origin',StringType(),nullable=True),
                   StructField('Dest',StringType(),nullable=True),
                   StructField('Distance',IntegerType(),nullable=True),
                   StructField('TaxiIn',FloatType(),nullable=True),
                   StructField('TaxiOut',FloatType(),nullable=True),
                   StructField('Cancelled',IntegerType(),nullable=True),
                   StructField('CancellationCode',StringType(),nullable=True),
                   StructField('Diverted',StringType(),nullable=True),
                   StructField('CarrierDelay',FloatType(),nullable=True),
                   StructField('WeatherDelay',FloatType(),nullable=True),
                   StructField('NASDelay',FloatType(),nullable=True),
                   StructField('SecurityDelay',FloatType(),nullable=True),
                   StructField('LateAircraftDelay',FloatType(),nullable=True)]
                   )

In [87]:
!pwd

/home/bigdatapedia/00MyOwn


In [88]:
!hdfs dfs -mkdir /user/bigdatapedia/airline_csv

24/02/06 15:34:20 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 39.0 (TID 63) (datanode3 executor 2): TaskKilled (another attempt succeeded)


mkdir: `/user/bigdatapedia/airline_csv': File exists


In [89]:
!hdfs dfs -put /home/bigdatapedia/00MyOwn/owndata/DelayedFlights.csv /user/bigdatapedia/airline_csv

put: `/user/bigdatapedia/airline_csv/DelayedFlights.csv': File exists


In [90]:
!hdfs dfs -ls /user/bigdatapedia/airline_csv

Found 1 items
-rw-r--r--   3 bigdatapedia supergroup  247963212 2024-02-06 14:28 /user/bigdatapedia/airline_csv/DelayedFlights.csv


In [91]:
df_csv = spark.read.csv("hdfs:///user/bigdatapedia/airline_csv/", header= True, inferSchema=True)

                                                                                

In [92]:
df_csv.show(5,0)

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|_c0|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|0  |2008|1    |3         |4        |2003.0 |1955      |2211.0 |2225      |WN     

In [93]:
df_csv.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted

In [94]:
df_csv.describe().toPandas()

                                                                                

Unnamed: 0,summary,_c0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,...,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,count,1936758.0,1936758.0,1936758.0,1936758.0,1936758.0,1936758.0,1936758.0,1929648.0,1936758.0,...,1929648.0,1936303.0,1936758.0,1936758,1936758.0,1247488.0,1247488.0,1247488.0,1247488.0,1247488.0
1,mean,3341651.1513358923,2008.0,6.11110629206127,15.753470490376186,3.984827221573372,1518.5341168075722,1467.4726439751378,1610.140628757162,1634.224640868916,...,6.812975216205236,18.232202811233574,0.00032683484462178546,,0.0040035977649246,19.17939892006977,3.7035706956700185,15.021635478657911,0.0901371395957315,25.296466178432176
2,stddev,2066064.9575203664,1.1366495020701165e-13,3.4825463936660563,8.776272060384283,1.995966275051828,450.4852547937496,424.7667995772801,548.1781425365643,464.6347119906808,...,5.27359506452569,14.338534198201948,0.0180756242576749,,0.0631472171101076,43.54620724547461,21.49290048410913,33.83305216365359,2.0227140475055934,42.0548615209541
3,min,0.0,2008.0,1.0,1.0,1.0,1.0,0.0,1.0,0.0,...,0.0,0.0,0.0,A,0.0,0.0,0.0,0.0,0.0,0.0
4,max,7009727.0,2008.0,12.0,31.0,7.0,2400.0,2359.0,2400.0,2400.0,...,240.0,422.0,1.0,N,1.0,2436.0,1352.0,1357.0,392.0,1316.0


In [95]:
"""

Name Description

Year 1987-2008
Month 1-12
DayofMonth 1-31
DayOfWeek 1 (Monday) - 7 (Sunday)
DepTime actual departure time (local, hhmm)
CRSDepTime scheduled departure time (local, hhmm)
ArrTime actual arrival time (local, hhmm)
CRSArrTime scheduled arrival time (local, hhmm)
UniqueCarrier unique carrier code
FlightNum flight number
TailNum plane tail number
ActualElapsedTime in minutes
CRSElapsedTime in minutes
AirTime in minutes
ArrDelay arrival delay, in minutes
DepDelay departure delay, in minutes
Origin origin IATA airport code
Dest destination IATA airport code
Distance in miles
TaxiIn taxi in time, in minutes
TaxiOut taxi out time in minutes
Cancelled was the flight cancelled?
CancellationCode reason for cancellation (A = carrier, B = weather, C = NAS, D = security)
Diverted 1 = yes, 0 = no
CarrierDelay in minutes
WeatherDelay in minutes
NASDelay in minutes
SecurityDelay in minutes
LateAircraftDelay in minutes

"""

'\n\nName Description\n\nYear 1987-2008\nMonth 1-12\nDayofMonth 1-31\nDayOfWeek 1 (Monday) - 7 (Sunday)\nDepTime actual departure time (local, hhmm)\nCRSDepTime scheduled departure time (local, hhmm)\nArrTime actual arrival time (local, hhmm)\nCRSArrTime scheduled arrival time (local, hhmm)\nUniqueCarrier unique carrier code\nFlightNum flight number\nTailNum plane tail number\nActualElapsedTime in minutes\nCRSElapsedTime in minutes\nAirTime in minutes\nArrDelay arrival delay, in minutes\nDepDelay departure delay, in minutes\nOrigin origin IATA airport code\nDest destination IATA airport code\nDistance in miles\nTaxiIn taxi in time, in minutes\nTaxiOut taxi out time in minutes\nCancelled was the flight cancelled?\nCancellationCode reason for cancellation (A = carrier, B = weather, C = NAS, D = security)\nDiverted 1 = yes, 0 = no\nCarrierDelay in minutes\nWeatherDelay in minutes\nNASDelay in minutes\nSecurityDelay in minutes\nLateAircraftDelay in minutes\n\n'

In [96]:
df_edit1 =  df_csv.withColumnRenamed('DayofMonth','day_of_month').\
                withColumnRenamed('DayOfWeek','day_of_week').\
                withColumnRenamed('DepTime','actual_departure_time').\
                withColumnRenamed('CRSDepTime','scheduled_departure_time').\
                withColumnRenamed('ArrTime','actual_arrival_time').\
                withColumnRenamed('CRSArrTime','scheduled_arrival_time').\
                withColumnRenamed('UniqueCarrier','unique_carrier').\
                withColumnRenamed('FlightNum','flight_number').\
                withColumnRenamed('TailNum','plane_tail_number').\
                withColumnRenamed('ActualElapsedTime','actual_elapsed_time').\
                withColumnRenamed('CRSElapsedTime','scheduled_elapsed_time').\
                withColumnRenamed('AirTime','air_time').\
                withColumnRenamed('ArrDelay','arrival_delay').\
                withColumnRenamed('DepDelay','departure_delay').\
                withColumnRenamed('TaxiIn','taxi_in').\
                withColumnRenamed('TaxiOut','taxi_out').\
                withColumnRenamed('CancellationCode','cancellation_code').\
                withColumnRenamed('CarrierDelay','carrier_delay').\
                withColumnRenamed('WeatherDelay','weather_delay').\
                withColumnRenamed('NASDelay','nas_delay').\
                withColumnRenamed('SecurityDelay','security_delay').\
                withColumnRenamed('LateAircraftDelay','late_aircraft_delay')

In [97]:
for col in df_edit1.columns:
    df_edit1 = df_edit1.withColumnRenamed(col, col.lower())

In [98]:
df_edit1.columns

['_c0',
 'year',
 'month',
 'day_of_month',
 'day_of_week',
 'actual_departure_time',
 'scheduled_departure_time',
 'actual_arrival_time',
 'scheduled_arrival_time',
 'unique_carrier',
 'flight_number',
 'plane_tail_number',
 'actual_elapsed_time',
 'scheduled_elapsed_time',
 'air_time',
 'arrival_delay',
 'departure_delay',
 'origin',
 'dest',
 'distance',
 'taxi_in',
 'taxi_out',
 'cancelled',
 'cancellation_code',
 'diverted',
 'carrier_delay',
 'weather_delay',
 'nas_delay',
 'security_delay',
 'late_aircraft_delay']

In [99]:
df_edit2 = df_edit1.drop('')

In [100]:
df_edit2.limit(2).toPandas()

Unnamed: 0,_c0,year,month,day_of_month,day_of_week,actual_departure_time,scheduled_departure_time,actual_arrival_time,scheduled_arrival_time,unique_carrier,...,taxi_in,taxi_out,cancelled,cancellation_code,diverted,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay
0,0,2008,1,3,4,2003.0,1955,2211.0,2225,WN,...,4.0,8.0,0,N,0,,,,,
1,1,2008,1,3,4,754.0,735,1002.0,1000,WN,...,5.0,10.0,0,N,0,,,,,


In [101]:
from pyspark.sql.functions import col,isnan,when,count,concat, lit, to_date

In [102]:
# Create a list of expressions for each column
column_exprs = [
    count(when(col(c).contains('None') | col(c).contains('NULL') | (col(c) == '') |
              col(c).isNull() | isnan(col(c)), c)).alias(c)
    for c in df_edit2.columns
]

# Apply the list of expressions to select data from the DataFrame
df2 = df_edit2.select(column_exprs)

# Convert the resulting DataFrame to a Pandas DataFrame
df2.show(5,0)



+---+----+-----+------------+-----------+---------------------+------------------------+-------------------+----------------------+--------------+-------------+-----------------+-------------------+----------------------+--------+-------------+---------------+------+----+--------+-------+--------+---------+-----------------+--------+-------------+-------------+---------+--------------+-------------------+
|_c0|year|month|day_of_month|day_of_week|actual_departure_time|scheduled_departure_time|actual_arrival_time|scheduled_arrival_time|unique_carrier|flight_number|plane_tail_number|actual_elapsed_time|scheduled_elapsed_time|air_time|arrival_delay|departure_delay|origin|dest|distance|taxi_in|taxi_out|cancelled|cancellation_code|diverted|carrier_delay|weather_delay|nas_delay|security_delay|late_aircraft_delay|
+---+----+-----+------------+-----------+---------------------+------------------------+-------------------+----------------------+--------------+-------------+-----------------+----

                                                                                

In [103]:
df_edit2.count()

                                                                                

1936758

In [104]:
df_edit3 = df_edit2.withColumn('day_of_week', when(df_edit2.day_of_week == 1,'Monday').\
                                              when(df_edit2.day_of_week ==2,'Tuesday').\
                                              when(df_edit2.day_of_week ==3,'Wednesday').\
                                              when(df_edit2.day_of_week ==4,'Thursday').\
                                              when(df_edit2.day_of_week ==5,'Friday').\
                                              when(df_edit2.day_of_week ==6,'Saturday').\
                                              when(df_edit2.day_of_week ==7,'Sunday'))

In [105]:
df_edit3.limit(2000).toPandas()

Unnamed: 0,_c0,year,month,day_of_month,day_of_week,actual_departure_time,scheduled_departure_time,actual_arrival_time,scheduled_arrival_time,unique_carrier,...,taxi_in,taxi_out,cancelled,cancellation_code,diverted,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay
0,0,2008,1,3,Thursday,2003.0,1955,2211.0,2225,WN,...,4.0,8.0,0,N,0,,,,,
1,1,2008,1,3,Thursday,754.0,735,1002.0,1000,WN,...,5.0,10.0,0,N,0,,,,,
2,2,2008,1,3,Thursday,628.0,620,804.0,750,WN,...,3.0,17.0,0,N,0,,,,,
3,4,2008,1,3,Thursday,1829.0,1755,1959.0,1925,WN,...,3.0,10.0,0,N,0,2.0,0.0,0.0,0.0,32.0
4,5,2008,1,3,Thursday,1940.0,1915,2121.0,2110,WN,...,4.0,10.0,0,N,0,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1995,3145,2008,1,4,Friday,2038.0,2005,2226.0,2205,WN,...,3.0,11.0,0,N,0,8.0,0.0,0.0,0.0,13.0
1996,3147,2008,1,4,Friday,1629.0,1615,1828.0,1815,WN,...,7.0,13.0,0,N,0,,,,,
1997,3148,2008,1,4,Friday,1254.0,1225,1432.0,1350,WN,...,4.0,13.0,0,N,0,20.0,0.0,13.0,0.0,9.0
1998,3150,2008,1,4,Friday,1702.0,1625,1831.0,1750,WN,...,3.0,9.0,0,N,0,10.0,0.0,4.0,0.0,27.0


In [106]:
df_edit4 = df_edit3.withColumn('date', to_date(concat(col('day_of_month'), lit(' '), col('month'), lit(' '), col('year')), 'd M yyyy'))

In [107]:
df_edit4.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- actual_departure_time: double (nullable = true)
 |-- scheduled_departure_time: integer (nullable = true)
 |-- actual_arrival_time: double (nullable = true)
 |-- scheduled_arrival_time: integer (nullable = true)
 |-- unique_carrier: string (nullable = true)
 |-- flight_number: integer (nullable = true)
 |-- plane_tail_number: string (nullable = true)
 |-- actual_elapsed_time: double (nullable = true)
 |-- scheduled_elapsed_time: double (nullable = true)
 |-- air_time: double (nullable = true)
 |-- arrival_delay: double (nullable = true)
 |-- departure_delay: double (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- taxi_in: double (nullable = true)
 |-- taxi_out: double (nullable = true)
 |-- can

24/02/06 15:36:13 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 48.0 (TID 76) (datanode1 executor 1): TaskKilled (another attempt succeeded)


In [111]:
df_edit4.limit(5).toPandas()

                                                                                

Unnamed: 0,_c0,year,month,day_of_month,day_of_week,actual_departure_time,scheduled_departure_time,actual_arrival_time,scheduled_arrival_time,unique_carrier,...,taxi_out,cancelled,cancellation_code,diverted,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay,date
0,0,2008,1,3,Thursday,2003.0,1955,2211.0,2225,WN,...,8.0,0,N,0,,,,,,2008-01-03
1,1,2008,1,3,Thursday,754.0,735,1002.0,1000,WN,...,10.0,0,N,0,,,,,,2008-01-03
2,2,2008,1,3,Thursday,628.0,620,804.0,750,WN,...,17.0,0,N,0,,,,,,2008-01-03
3,4,2008,1,3,Thursday,1829.0,1755,1959.0,1925,WN,...,10.0,0,N,0,2.0,0.0,0.0,0.0,32.0,2008-01-03
4,5,2008,1,3,Thursday,1940.0,1915,2121.0,2110,WN,...,10.0,0,N,0,,,,,,2008-01-03


In [112]:
from pyspark.sql.functions import *
temp = df_edit4.withColumn('date',to_timestamp('date'))
temp.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- actual_departure_time: double (nullable = true)
 |-- scheduled_departure_time: integer (nullable = true)
 |-- actual_arrival_time: double (nullable = true)
 |-- scheduled_arrival_time: integer (nullable = true)
 |-- unique_carrier: string (nullable = true)
 |-- flight_number: integer (nullable = true)
 |-- plane_tail_number: string (nullable = true)
 |-- actual_elapsed_time: double (nullable = true)
 |-- scheduled_elapsed_time: double (nullable = true)
 |-- air_time: double (nullable = true)
 |-- arrival_delay: double (nullable = true)
 |-- departure_delay: double (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- taxi_in: double (nullable = true)
 |-- taxi_out: double (nullable = true)
 |-- can

In [113]:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window

In [114]:
window = Window.partitionBy('day_of_week').orderBy("date")
df_with_index = temp.withColumn("index", row_number().over(window))

In [115]:
df_with_index.limit(20).toPandas()   #duration: 43 s

24/02/06 15:42:20 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 55.0 (TID 84) (datanode3 executor 2): TaskKilled (Stage finished)
                                                                                

Unnamed: 0,_c0,year,month,day_of_month,day_of_week,actual_departure_time,scheduled_departure_time,actual_arrival_time,scheduled_arrival_time,unique_carrier,...,cancelled,cancellation_code,diverted,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay,date,index
0,94065,2008,1,2,Wednesday,1252.0,1223,1457.0,1428,XE,...,0,N,0,6.0,0.0,0.0,0.0,23.0,2008-01-02,1
1,94072,2008,1,2,Wednesday,1354.0,1230,1523.0,1345,XE,...,0,N,0,84.0,0.0,14.0,0.0,0.0,2008-01-02,2
2,94073,2008,1,2,Wednesday,747.0,705,1155.0,1058,XE,...,0,N,0,42.0,0.0,15.0,0.0,0.0,2008-01-02,3
3,94143,2008,1,2,Wednesday,1907.0,1840,4.0,2342,XE,...,0,N,0,10.0,0.0,0.0,0.0,12.0,2008-01-02,4
4,94201,2008,1,2,Wednesday,1358.0,1315,1637.0,1558,XE,...,0,N,0,14.0,0.0,0.0,0.0,25.0,2008-01-02,5
5,94383,2008,1,2,Wednesday,1548.0,1400,1841.0,1706,XE,...,0,N,0,21.0,0.0,0.0,0.0,74.0,2008-01-02,6
6,94413,2008,1,2,Wednesday,1935.0,1845,2050.0,2019,XE,...,0,N,0,0.0,0.0,0.0,0.0,31.0,2008-01-02,7
7,94439,2008,1,2,Wednesday,847.0,830,1137.0,1136,XE,...,0,N,0,,,,,,2008-01-02,8
8,94500,2008,1,2,Wednesday,1223.0,955,1459.0,1224,XE,...,0,N,0,0.0,0.0,7.0,0.0,148.0,2008-01-02,9
9,94531,2008,1,2,Wednesday,944.0,915,1252.0,1224,XE,...,0,N,0,28.0,0.0,0.0,0.0,0.0,2008-01-02,10


In [116]:
df_with_index.show(5,0) ## duration: 24 s 

24/02/06 15:44:00 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 61.0 (TID 112) (datanode3 executor 2): TaskKilled (Stage finished)

+-----+----+-----+------------+-----------+---------------------+------------------------+-------------------+----------------------+--------------+-------------+-----------------+-------------------+----------------------+--------+-------------+---------------+------+----+--------+-------+--------+---------+-----------------+--------+-------------+-------------+---------+--------------+-------------------+-------------------+-----+
|_c0  |year|month|day_of_month|day_of_week|actual_departure_time|scheduled_departure_time|actual_arrival_time|scheduled_arrival_time|unique_carrier|flight_number|plane_tail_number|actual_elapsed_time|scheduled_elapsed_time|air_time|arrival_delay|departure_delay|origin|dest|distance|taxi_in|taxi_out|cancelled|cancellation_code|diverted|carrier_delay|weather_delay|nas_delay|security_delay|late_aircraft_delay|date               |index|
+-----+----+-----+------------+-----------+---------------------+------------------------+-------------------+----------------

                                                                                

In [117]:
!hdfs dfs -mkdir /user/bigdatapedia/airline_parq


In [120]:
!hdfs dfs -ls /user/bigdatapedia/airline_parq/

In [122]:
!pwd

/home/bigdatapedia/00MyOwn


In [121]:
df_with_index.write.partitionBy("day_of_week").mode("overwrite").parquet("/user/bigdatapedia/airline_parq/")


                                                                                

In [123]:
parDF=spark.read.parquet("/user/bigdatapedia/airline_parq/")

In [124]:
parDF.limit(5).toPandas()

                                                                                

Unnamed: 0,_c0,year,month,day_of_month,actual_departure_time,scheduled_departure_time,actual_arrival_time,scheduled_arrival_time,unique_carrier,flight_number,...,cancellation_code,diverted,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay,date,index,day_of_week
0,2367,2008,1,4,2003.0,1845,2207.0,2040,WN,746,...,N,0,0.0,0.0,9.0,0.0,78.0,2008-01-04,1,Friday
1,2369,2008,1,4,1444.0,1435,1712.0,1710,WN,45,...,N,0,,,,,,2008-01-04,2,Friday
2,2371,2008,1,4,2203.0,2015,30.0,2240,WN,230,...,N,0,1.0,0.0,2.0,0.0,107.0,2008-01-04,3,Friday
3,2373,2008,1,4,1137.0,1130,1409.0,1405,WN,438,...,N,0,,,,,,2008-01-04,4,Friday
4,2374,2008,1,4,1840.0,1810,2115.0,2045,WN,972,...,N,0,12.0,0.0,0.0,0.0,18.0,2008-01-04,5,Friday
