# Czyszczenie zbiorów danych df_flights i df_weather, obsługa outlierów oraz wartości NULL

## Dostępy


In [0]:
import os
with open('tokens_and_api.txt', 'r') as file:
    exec(file.read())
storage_account_name = "newadbprojektkakastorage"
container_name = "data"

spark.conf.set(
    f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "SAS"
)
spark.conf.set(
    f"fs.azure.sas.token.provider.type.{storage_account_name}.dfs.core.windows.net",
    "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider"
)
spark.conf.set(
    f"fs.azure.sas.fixed.token.{storage_account_name}.dfs.core.windows.net",
    sas_token
)

base_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net"
print(f"Skonfigurowano dostęp do: {base_path}")

## Wczytanie zbiorów danych

In [0]:
from pyspark.sql.functions import col, to_date, to_timestamp, month, dayofweek, hour, when, count, avg, round, lit, concat

df_flights = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(f"{base_path}/flights.csv")

df_weather = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(f"{base_path}/jfk_weather_cleaned.csv") 

print("Liczba wierszy w flights:", df_flights.count())
display(df_flights.limit(5))

# Czyszczenie i inżynieria danych na zbiorze df_flights
## Filtrowanie po lotnisku "JFK"

In [0]:
df_jfk = df_flights.filter(col("ORIGIN_AIRPORT") == "JFK")

print("Liczba wierszy w df_jfk:", df_jfk.count())
display(df_jfk.limit(3))

## Usunięcie opóźnień spowodowanych przez inne czynniki niż pogoda

In [0]:
from pyspark.sql.functions import col, lit, coalesce

# Usuwamy wiersze, gdzie inne opóźnienia są większe niż 0
df_weather_delays = df_jfk.filter(
    (coalesce(col("AIR_SYSTEM_DELAY"), lit(0)) == 0) &
    (coalesce(col("SECURITY_DELAY"), lit(0)) == 0) &
    (coalesce(col("AIRLINE_DELAY"), lit(0)) == 0) &
    (coalesce(col("LATE_AIRCRAFT_DELAY"), lit(0)) == 0)
).select("YEAR","MONTH","DAY","DAY_OF_WEEK","AIRLINE","FLIGHT_NUMBER","SCHEDULED_DEPARTURE","DEPARTURE_TIME","DEPARTURE_DELAY","DISTANCE","CANCELLED", "ORIGIN_AIRPORT","WEATHER_DELAY")

print("Liczba wierszy:", df_jfk.count())
display(df_weather_delays.limit(5))

## Sprawdzenie duplikatów

In [0]:
# Sprawdzenie duplikatów 
duplicate_count = df_weather_delays.count() - df_weather_delays.dropDuplicates().count()
print(f"Liczba zduplikowanych wierszy: {duplicate_count}")

## Czyszczenie i zamiana na poprawny format daty i godziny

In [0]:
# Czy daty i godziny są wczytywane jako stringi lub liczby?
df_weather_delays.printSchema()

In [0]:
from pyspark.sql.functions import col, lpad, concat, to_timestamp, lit, when, substring

def clean_time_column(column_name):
    padded = lpad(col(column_name).cast("string"), 4, '0')
    return when(padded == '2400', '0000').otherwise(padded)

df_jfk_cleaned = df_weather_delays \
    .withColumn("CleanScheduled", clean_time_column("SCHEDULED_DEPARTURE")) \
    .withColumn("CleanDeparture", clean_time_column("DEPARTURE_TIME"))

df_jfk_timestamps = df_jfk_cleaned.withColumn(
    "ScheduledTimestamp",
    to_timestamp(
        concat(
            col("YEAR"), lit("-"), 
            lpad(col("MONTH"), 2, '0'), lit("-"), 
            lpad(col("DAY"), 2, '0'), lit(" "), 
            substring(col("CleanScheduled"), 1, 2), lit(":"),
            substring(col("CleanScheduled"), 3, 2), lit(":00") 
        ),
        "yyyy-MM-dd HH:mm:ss"
    )
).withColumn(
    "DeparturedTimestamp",
    to_timestamp(
        concat(
            col("YEAR"), lit("-"), 
            lpad(col("MONTH"), 2, '0'), lit("-"), 
            lpad(col("DAY"), 2, '0'), lit(" "), 
            substring(col("CleanDeparture"), 1, 2), lit(":"), 
            substring(col("CleanDeparture"), 3, 2), lit(":00") 
        ),
        "yyyy-MM-dd HH:mm:ss"
    )
)

df_jfk_timestamps_clean = df_jfk_timestamps.drop("CleanScheduled", "CleanDeparture", "YEAR", "MONTH", "DAY", 
    "SCHEDULED_DEPARTURE", "DEPARTURE_TIME")

print("Sprawdzenie konwersji czasu (bez błędu 24:00):")
display(df_jfk_timestamps.select(
    "YEAR", "MONTH", "DAY", 
    "SCHEDULED_DEPARTURE", "ScheduledTimestamp", 
    "DEPARTURE_TIME", "DeparturedTimestamp"
).limit(3))

## Obsługa wartości NULL

In [0]:
from pyspark.sql.functions import col, count, when

exprs = [count(when(col(c).isNull(), c)).alias(c) for c in df_jfk_timestamps_clean.columns]
df_null_counts = df_jfk_timestamps_clean.select(*exprs)

row = df_null_counts.first()

cols_with_nulls = [c for c in df_null_counts.columns if row[c] > 0]

if cols_with_nulls:
    display(df_null_counts.select(*cols_with_nulls))
else:
    print("Brak nulli.")

In [0]:
# Analiza wartości NULL 
# Sprawdzamy, czy braki w 'DEPARTURE_DELAY' pokrywają się z odwołanymi lotami ('CANCELLED' == 1)

df_jfk_total_nulls = df_jfk_timestamps_clean.select(
    count(when(col("DEPARTURE_DELAY").isNull(), 1)).alias("Total_Null_DepDelay")
)
display(df_jfk_total_nulls)
df_integrity = df_jfk_timestamps_clean.groupBy("CANCELLED").agg(
    count("*").alias("Total"),
    count("DEPARTURE_DELAY").alias("NonNull_DepDelay"),
    count(when(col("DEPARTURE_DELAY").isNull(), 1)).alias("Null_DepDelay")
)
display(df_integrity)


Dla wszystkich pustych wartości w DEPARTURE_DELAY lot został odwołany.


## Analiza i obsługa outlierów

In [0]:
df_jfk_no_nulls = df_jfk_timestamps_clean.filter(col("DEPARTURE_DELAY").isNotNull())
display(df_jfk_no_nulls.limit(10))
print(f"Liczba wierszy: {df_jfk_no_nulls.count()}")

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
# Sprawdzenie anomalii w opóźnieniach
# Outliers skrajnie niskie (np. -30 i mniej)
outliers_low = df_jfk_timestamps_clean.filter(
    col("DEPARTURE_DELAY") < -30
).select(
    "AIRLINE", "ORIGIN_AIRPORT",  "DEPARTURE_DELAY", "ScheduledTimestamp","DeparturedTimestamp"
)

# Outliers skrajnie wysokie (np. > 6h)
outliers_high = df_jfk_timestamps_clean.filter(
    col("DEPARTURE_DELAY") > 6 * 60
).select(
    "AIRLINE", "ORIGIN_AIRPORT", "DEPARTURE_DELAY", "ScheduledTimestamp","DeparturedTimestamp"
)

# Połączenie
outliers = outliers_low.union(outliers_high)
display(outliers)

display(df_jfk_timestamps_clean.limit(10))


In [0]:
# Statystyki opisowe dla opóźnień
display(df_jfk_timestamps_clean.select("DEPARTURE_DELAY").summary())

In [0]:
# Obliczamy granice IQR
IQR = df_jfk_timestamps_clean.approxQuantile("DEPARTURE_DELAY", [0.05, 0.95], 0.01)
print(f"Granice IQR: {IQR[0]} - {IQR[1]}")
lower_bound = IQR[0]
upper_bound = IQR[1]

df_jfk_no_outliers = df_jfk_timestamps_clean.filter(
    (col("DEPARTURE_DELAY") >= lower_bound) & 
    (col("DEPARTURE_DELAY") <= upper_bound)
)

print(f"Liczba wierszy przed: {df_jfk_timestamps_clean.count()}")
print(f"Liczba wierszy po: {df_jfk_no_outliers.count()}")   


# Testujemy też wartości, gdy górna granica nie istnieje:
IQR2 = df_jfk_timestamps_clean.approxQuantile("DEPARTURE_DELAY", [0.05, 1.0], 0.01)
print(f"Granice IQR: {IQR[0]} - {IQR[1]}")
lower_bound = IQR[0]
upper_bound = IQR[1]

df_jfk_with_upper_outliers = df_jfk_timestamps_clean.filter(
    (col("DEPARTURE_DELAY") >= lower_bound) & 
    (col("DEPARTURE_DELAY") <= upper_bound)
)

print(f"Liczba wierszy przed: {df_jfk_timestamps_clean.count()}")
print(f"Liczba wierszy po: {df_jfk_with_upper_outliers.count()}")


In [0]:
# Statystyki opisowe dla opóźnień
display(df_jfk_no_outliers.select("DEPARTURE_DELAY").summary())

# Czyszczenie i inżynieria danych na zbiorze df_weather
## Filtrowanie po danych tylko z 2015 roku


In [0]:
from pyspark.sql.functions import col, count, desc,year 

df_weather_2015 = df_weather.filter(year(col("DATE")) == 2015)

print("Próbka danych")
display(df_weather_2015.limit(3))

## Wybór interesujących nas kolumn - wpływ prędkości wiatru, widoczności, opadów atmosferycznych

In [0]:
df_weather_impact = df_weather_2015.select("DATE","HOURLYPrecip","HOURLYVISIBILITY","HOURLYWindSpeed")

## Analiza wartości pustych i duplikatów

In [0]:

from pyspark.sql.functions import count, when

exprs_null = [count(when(col(c).isNull(), c)).alias(c) for c in df_weather_impact.columns]


row_nulls = df_weather_impact.select(*exprs_null).first().asDict()
null_data = [(k, v) for k, v in row_nulls.items() if v > 0] 

print("Kolumny zawierające NULL-e")
if null_data:
    df_null_report = spark.createDataFrame(null_data, ["Kolumna", "Liczba_Nulli"])
    display(df_null_report.orderBy(col("Liczba_Nulli").desc()))
else:
    print("Brak wartości NULL (uwaga: puste stringi '' nie są liczone jako NULL!)")


total_count = df_weather_impact.count()
distinct_count = df_weather_impact.distinct().count()
duplicate_count = total_count - distinct_count

print(f"Całkowita liczba wierszy: {total_count}")
print(f"Liczba unikalnych wierszy: {distinct_count}")
print(f"Liczba zduplikowanych wierszy: {duplicate_count}")


# Połączenie obu zbiorów danych w jedno i zapisanie do pliku

In [0]:
from pyspark.sql.functions import col, to_timestamp, concat, lit, lpad, expr, hour, date_trunc

df_flights_fixed = df_jfk_no_outliers.withColumn("Hour", hour(col("ScheduledTimestamp"))) 

df_weather_hourly = df_weather_impact.withColumn("WeatherDate", col("DATE"))\
    .withColumn("WeatherHour", hour(col("WeatherDate")))\
    .withColumn("DateOnly", to_timestamp(date_trunc("day", col("WeatherDate"))))


print("Łączenie z pogodą...")
df_joined = df_flights_fixed.join(
    df_weather_hourly,
    (to_timestamp(date_trunc("day", df_flights_fixed.ScheduledTimestamp)) == df_weather_hourly.DateOnly) & 
    (df_flights_fixed.Hour == df_weather_hourly.WeatherHour),
    "left"
)

print("Statystyki opóźnień w zależności od pogody (JFK):")
display(df_joined.select("DEPARTURE_DELAY", "HOURLYVISIBILITY", "HOURLYWindSpeed", "HOURLYPrecip").summary())


df_flights_fixed2 = df_jfk_with_upper_outliers.withColumn("Hour", hour(col("ScheduledTimestamp"))) 

df_joined2 = df_flights_fixed2.join(
    df_weather_hourly,
    (to_timestamp(date_trunc("day", df_flights_fixed2.ScheduledTimestamp)) == df_weather_hourly.DateOnly) & 
    (df_flights_fixed2.Hour == df_weather_hourly.WeatherHour),
    "left"
)

print("Statystyki opóźnień w zależności od pogody (JFK):")
display(df_joined2.select("DEPARTURE_DELAY", "HOURLYVISIBILITY", "HOURLYWindSpeed", "HOURLYPrecip").summary())

In [0]:
df_joined.printSchema()

In [0]:
display(df_joined.limit(3))

In [0]:
df_weather_jfk = df_joined.drop("DATE", "DateOnly","WeatherDate","WeatherHour","Hour")

In [0]:
output_path = f"{base_path}/final_results/df_weather_jfk.csv"

df_weather_jfk.coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(output_path)

print(f"Zapisano do: {output_path}")

In [0]:
output_path = f"{base_path}/final_results/df_weather_jfk_with_upper_outliers.csv"

df_weather_jfk.coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(output_path)

print(f"Zapisano do: {output_path}")