In [1]:
import pandas as pd

In [11]:
df = pd.read_csv('../raw_data/air_quality_data.csv')
df.head()

Unnamed: 0,aqi,co,datetime,no2,o3,pm10,pm25,so2,timestamp_local,timestamp_utc,ts
0,157,177.0,2023-11-17:09,5.0,112.0,61.0,61.0,31.0,2023-11-17T16:00:00,2023-11-17T09:00:00,1700211600
1,104,185.7,2023-11-17:08,4.0,125.7,37.0,37.0,37.3,2023-11-17T15:00:00,2023-11-17T08:00:00,1700208000
2,128,194.3,2023-11-17:07,3.0,139.3,45.7,45.67,43.7,2023-11-17T14:00:00,2023-11-17T07:00:00,1700204400
3,107,203.0,2023-11-17:06,2.0,153.0,38.0,38.0,50.0,2023-11-17T13:00:00,2023-11-17T06:00:00,1700200800
4,137,223.7,2023-11-17:05,2.7,149.3,49.0,49.0,57.0,2023-11-17T12:00:00,2023-11-17T05:00:00,1700197200


In [4]:
df = pd.read_csv('../raw_data/traffic_jam_data.csv')
df.head()

Unnamed: 0,jam_id,type,level,severity,line_coordinates,start_location,end_location,speed_kmh,length_meters,delay_seconds,block_alert_id,block_alert_type,block_alert_description,block_alert_update_datetime_utc,block_start_datetime_utc,publish_datetime_utc,update_datetime_utc,country,city,street
0,1306709498,NONE,2,5,"[{'lat': -6.3498, 'lon': 106.749779}, {'lat': ...",,,21.06,1263,111,,,,,,2023-11-17T09:03:41.328Z,2023-11-17T10:30:24.539Z,ID,Tangerang Selatan,N6 RE Martadinata
1,1310361102,NONE,4,5,"[{'lat': -6.123958, 'lon': 106.704834}, {'lat'...",,,4.77,278,177,,,,,,2023-11-17T09:55:00.340Z,2023-11-17T10:30:25.000Z,ID,Jakarta Barat,Citra Garden 5
2,1312734760,NONE,3,5,"[{'lat': -6.149167, 'lon': 106.847985}, {'lat'...",,,8.41,347,109,,,,,,2023-11-17T10:21:19.094Z,2023-11-17T10:30:27.341Z,ID,Jakarta Pusat,Kawasan PRJ Kemayoran
3,1312796200,NONE,3,5,"[{'lat': -6.305742, 'lon': 106.859124}, {'lat'...",,,5.33,373,187,,,,,,2023-11-17T10:21:18.419Z,2023-11-17T10:30:26.700Z,ID,Jakarta Timur,Kesehatan
4,1313179214,NONE,2,5,"[{'lat': -6.204239, 'lon': 106.872368}, {'lat'...",,,12.87,610,84,,,,,,2023-11-17T10:25:11.798Z,2023-11-17T10:30:24.649Z,ID,Jakarta Timur,Kramat Asem


# To Do Transformation

# Air Condition

# Column to drop = datetime,


In [53]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, when, lit, lag, lead
from pyspark.sql.window import Window

# Create a SparkSession
spark = SparkSession.builder.appName("CSV Reader").getOrCreate()

# Read CSV file into a Spark DataFrame
file_path = "../raw_data/traffic_jam_data.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

def replace_outliers_with_mean_adjacent(df, max_values):
    windowSpec = Window.orderBy('timestamp_utc')  # Define your timestamp column here

    for col_name in max_values.keys():
        max_val = max_values.get(col_name, float('inf'))
        df = df.withColumn(col_name,
                           when((col(col_name) < 0) | (col(col_name) > max_val),
                                  (lag(col_name, default=0).over(windowSpec)+ lead(col_name, default=0).over(windowSpec)) / 2.0)
                           .otherwise(col(col_name)))

    return df

# Cleaning data
# drop unecessary columns
columns_to_drop = ['line_coordinates', 
                   
                   'update_datetime_utc']
df = df.drop(*columns_to_drop)

# rename publish_datetime_utc to timestamp_utc
df = df.withColumnRenamed("publish_datetime_utc", "timestamp_utc")

# drom row with all missing value
df = df.dropna(how ='all')

# drop duplicates
df = df.dropDuplicates()

# consistent dtypes
dtypes = {
    'jam_id': 'integer', 
    'type': 'string', 
    'level': 'integer',
    'severity': 'integer', 
    'speed_kmh': 'double', 
    'length_meters': 'integer',
    'delay_seconds': 'integer',
    'start_location': 'string', 
    'end_location': 'string', 
    'block_alert_id': 'integer', 
    'block_alert_type': 'string',
    'block_alert_description' : 'string',
    'block_alert_update_datetime_utc': 'timestamp',
    'block_start_datetime_utc' :'timestamp',
    'timestamp_utc': 'timestamp',
    'country': 'string',
    'city': 'string',
    'street': 'string'}

def consistent_dtype(df, dtype):
    for col_name in dtype.keys():
        type = dtype.get(col_name)
        df = df.withColumn(col_name, col(col_name).cast(type))
    return df

df = consistent_dtype(df, dtypes)

# consistent datetime format
df = df.withColumn('timestamp_utc', to_timestamp('timestamp_utc', 'yyyy-MM-dd HH:mm:ss'))

# handle Disgusting Values
max_val = {'co':150000.0, 'no2': 3840.0, 'o3': 1200.0, 'pm10': 600.0, 'pm25': 500.0, 'so2': 800.0}
df = replace_outliers_with_mean_adjacent(df, max_val)




# Show the DataFrame schema and preview the data
df.printSchema()
df.show()
print(df.columns)


root
 |-- aqi: integer (nullable = true)
 |-- co: double (nullable = true)
 |-- no2: double (nullable = true)
 |-- o3: double (nullable = true)
 |-- pm10: double (nullable = true)
 |-- pm25: double (nullable = true)
 |-- so2: double (nullable = true)
 |-- timestamp_utc: timestamp (nullable = true)

+---+-----+----+-----+----+-----+----+-------------------+
|aqi|   co| no2|   o3|pm10| pm25| so2|      timestamp_utc|
+---+-----+----+-----+----+-----+----+-------------------+
| 94|265.0| 8.7|113.7|47.0|32.33|37.7|2023-11-14 10:00:00|
|103|339.0|13.3| 95.3|53.0|36.67|39.3|2023-11-14 11:00:00|
|115|413.0|18.0| 77.0|59.0| 41.0|41.0|2023-11-14 12:00:00|
|118|420.0|16.7| 72.7|60.7| 42.0|38.3|2023-11-14 13:00:00|
|121|427.0|15.3| 68.3|62.3| 43.0|35.7|2023-11-14 14:00:00|
|123|434.0|14.0| 64.0|64.0| 44.0|33.0|2023-11-14 15:00:00|
|130|419.7|14.3| 62.3|67.7|46.67|38.7|2023-11-14 16:00:00|
|138|405.3|14.7| 60.7|71.3|49.33|44.3|2023-11-14 17:00:00|
|145|391.0|15.0| 59.0|75.0| 52.0|50.0|2023-11-14 18

In [92]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("CSV Reader").getOrCreate()

# Read CSV file into a Spark DataFrame
file_path = "../raw_data/traffic_jam_data.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the DataFrame schema and preview the data
df.printSchema()

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, when, lit, lag, lead
from pyspark.sql.window import Window

def replace_outliers_with_mean_adjacent(df, max_values):
    '''handle distinguish value by replace with adjacent mean'''
    windowSpec = Window.orderBy('timestamp_utc')  # Define your timestamp column here

    for col_name in max_values.keys():
        max_val = max_values.get(col_name, float('inf'))
        df = df.withColumn(col_name,
                           when((col(col_name) < 0) | (col(col_name) > max_val),
                                  (lag(col_name, default=0).over(windowSpec)+ lead(col_name, default=0).over(windowSpec)) / 2.0)
                           .otherwise(col(col_name)))

    return df

def consistent_dtype(df, dtype):
    '''make sure the dtype is correct'''
    for col_name in dtype.keys():
        type = dtype.get(col_name)
        df = df.withColumn(col_name, col(col_name).cast(type))
    return df
def saveToCSV(df) :
  try:
    # Save the DataFrame to a CSV file
    df.write.csv('/transform_data', header=True, mode='append')
    print(f"Data successfully saved")
  except IOError as e:
    print(f"Error while saving data: {e}")

# Transformation prcess start
if __name__ == "__main__":
  # Create a SparkSession
  spark = SparkSession.builder.appName("CSV Reader").getOrCreate()

  # Read CSV file into a Spark DataFrame
  file_path = "../raw_data/traffic_jam_data.csv"
  df = spark.read.csv(file_path, header=True, inferSchema=True)

  # drop unecessary columns
  columns_to_drop = ['line_coordinates', 
                   'start_location', 
                   'end_location', 
                   'block_alert_id', 
                   'block_alert_type',
                   'block_alert_description',
                   'block_alert_update_datetime_utc',
                   'block_start_datetime_utc',
                   'update_datetime_utc']
  columns_to_drop = ['line_coordinates']
  df = df.drop(*columns_to_drop)

  # rename publish_datetime_utc to timestamp_utc
  df = df.withColumnRenamed("publish_datetime_utc", "timestamp_utc")

  # drom row with all missing value
  df = df.dropna(how ='all')

  # drop duplicates
  df = df.dropDuplicates()

  # consistent dtypes
  # dtypes = {
  #   'jam_id': 'integer', 
  #   'type': 'string', 
  #   'level': 'integer',
  #   'severity': 'integer', 
  #   'speed_kmh': 'double', 
  #   'length_meters': 'integer',
  #   'delay_seconds': 'integer',
  #   'timestamp_utc': 'timestamp',
  #   'country': 'string',
  #   'city': 'string',
  #   'street': 'string'}
  dtypes = {
    'jam_id': 'integer', 
    'type': 'string', 
    'level': 'integer',
    'severity': 'integer', 
    'speed_kmh': 'double', 
    'length_meters': 'integer',
    'delay_seconds': 'integer',
    'start_location': 'string', 
    'end_location': 'string', 
    'block_alert_id': 'integer', 
    'block_alert_type': 'string',
    'block_alert_description' : 'string',
    'block_alert_update_datetime_utc': 'timestamp',
    'block_start_datetime_utc' :'timestamp',
    'timestamp_utc': 'timestamp',
    'country': 'string',
    'city': 'string',
    'street': 'string'}
  df = consistent_dtype(df, dtypes)

  # consistent datetime format
  df = df.withColumn('timestamp_utc', to_timestamp('timestamp_utc', 'yyyy-MM-dd HH:mm:ss'))
  df = df.withColumn('block_alert_update_datetime_utc', to_timestamp('block_alert_update_datetime_utc', 'yyyy-MM-dd HH:mm:ss'))
  df = df.withColumn('block_start_datetime_utc', to_timestamp('block_start_datetime_utc', 'yyyy-MM-dd HH:mm:ss'))

  # handle Disgusting Values
  max_val = {'co':150000.0, 'no2': 3840.0, 'o3': 1200.0, 'pm10': 600.0, 'pm25': 500.0, 'so2': 800.0}
  # df = replace_outliers_with_mean_adjacent(df, max_val)

  # save df to csv file
  # df.printSchema()
  df.filter(df['block_alert_id'] != 0).show()
  # df.select('block_alert_update_datetime_utc').distinct().show()
  

# Still error when saaving csv files or should be transform

+----------+----+-----+--------+--------------+------------+---------+-------------+-------------+--------------+-----------------+-----------------------+-------------------------------+------------------------+--------------------+--------------------+-------+-----------------+--------------------+
|    jam_id|type|level|severity|start_location|end_location|speed_kmh|length_meters|delay_seconds|block_alert_id| block_alert_type|block_alert_description|block_alert_update_datetime_utc|block_start_datetime_utc|       timestamp_utc| update_datetime_utc|country|             city|              street|
+----------+----+-----+--------+--------------+------------+---------+-------------+-------------+--------------+-----------------+-----------------------+-------------------------------+------------------------+--------------------+--------------------+-------+-----------------+--------------------+
|1283271796|NONE|    5|       5|          NULL|        NULL|      0.0|          633|          

In [2]:
# Transform Air Quality
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, when, lit, lag, lead
from pyspark.sql.window import Window

def replace_outliers_with_mean_adjacent(df, max_values):
    '''handle distinguish value by replace with adjacent mean'''
    windowSpec = Window.orderBy('timestamp_utc')  # Define your timestamp column here

    for col_name in max_values.keys():
        max_val = max_values.get(col_name, float('inf'))
        df = df.withColumn(col_name,
                           when((col(col_name) < 0) | (col(col_name) > max_val),
                                  (lag(col_name, default=0).over(windowSpec)+ lead(col_name, default=0).over(windowSpec)) / 2.0)
                           .otherwise(col(col_name)))

    return df

def consistent_dtype(df, dtype):
    '''make sure the dtype is correct'''
    for col_name in dtype.keys():
        type = dtype.get(col_name)
        df = df.withColumn(col_name, col(col_name).cast(type))
    return df
def saveToCSV(df) :
  try:
    # Save the DataFrame to a CSV file
    df.write.csv('/transform_data', header=True, mode='append')
    print(f"Data successfully saved")
  except IOError as e:
    print(f"Error while saving data: {e}")

# Transformation prcess start
if __name__ == "__main__":
  # Create a SparkSession
  spark = SparkSession.builder.appName("CSV Reader").getOrCreate()

  # Read CSV file into a Spark DataFrame
  file_path = "../raw_data/air_quality_data.csv"
  df = spark.read.csv(file_path, header=True, inferSchema=True)

  # drop unecessary columns
  columns_to_drop = ['datetime', 'timestamp_local', 'ts']
  df = df.drop(*columns_to_drop)

  # drom row with all missing value
  df = df.dropna(how ='all')

  # drop duplicates
  df = df.dropDuplicates()

  # consistent dtypes
  dtypes = {'aqi': 'integer', 'co':'double', 'no2': 'double', 'o3': 'double', 'pm10': 'double', 'pm25': 'double', 'so2': 'double', 'timestamp_utc': 'timestamp'}
  df = consistent_dtype(df, dtypes)

  # consistent datetime format
  df = df.withColumn('timestamp_utc', to_timestamp('timestamp_utc', 'yyyy-MM-dd HH:mm:ss'))

  # handle Disgusting Values
  max_val = {'co':150000.0, 'no2': 3840.0, 'o3': 1200.0, 'pm10': 600.0, 'pm25': 500.0, 'so2': 800.0}
  df = replace_outliers_with_mean_adjacent(df, max_val)

  # save df to csv file
  df.show()

# Still error when saaving csv files

+---+-----+----+-----+----+-----+----+-------------------+
|aqi|   co| no2|   o3|pm10| pm25| so2|      timestamp_utc|
+---+-----+----+-----+----+-----+----+-------------------+
| 94|265.0| 8.7|113.7|47.0|32.33|37.7|2023-11-14 10:00:00|
|103|339.0|13.3| 95.3|53.0|36.67|39.3|2023-11-14 11:00:00|
|115|413.0|18.0| 77.0|59.0| 41.0|41.0|2023-11-14 12:00:00|
|118|420.0|16.7| 72.7|60.7| 42.0|38.3|2023-11-14 13:00:00|
|121|427.0|15.3| 68.3|62.3| 43.0|35.7|2023-11-14 14:00:00|
|123|434.0|14.0| 64.0|64.0| 44.0|33.0|2023-11-14 15:00:00|
|130|419.7|14.3| 62.3|67.7|46.67|38.7|2023-11-14 16:00:00|
|138|405.3|14.7| 60.7|71.3|49.33|44.3|2023-11-14 17:00:00|
|145|391.0|15.0| 59.0|75.0| 52.0|50.0|2023-11-14 18:00:00|
|148|426.7|16.0| 53.0|76.0| 53.0|53.7|2023-11-14 19:00:00|
|150|462.3|17.0| 47.0|77.0| 54.0|57.3|2023-11-14 20:00:00|
|153|498.0|18.0| 41.0|78.0| 55.0|61.0|2023-11-14 21:00:00|
|153|565.0|19.0| 50.0|81.7|57.33|63.3|2023-11-14 22:00:00|
|156|632.0|20.0| 59.0|85.3|59.67|65.7|2023-11-14 23:00:0