In [None]:
! pip install pyspark

In [None]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from pyspark.sql.functions import col, sum
import pyspark.sql.functions as F

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
df=spark.read.csv("/kaggle/input/datasets-for-pyspark-project/airports.csv",header=True)

In [None]:
df.show(10)

In [None]:
print("Columns:", df.columns)

In [None]:
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()

In [None]:
df2 = spark.read.csv('/kaggle/input/datasets-for-pyspark-project/flights_small.csv', header=True)
df2.show(10)

In [None]:
print(df2.columns)

In [None]:
null_counts_2 = df2.select([sum(col(c).isNull().cast("int")).alias(c) for c in df2.columns])
null_counts_2.show()

In [None]:
df2=df2.withColumn("duration_hrs",df2.air_time/60)
df2.show(5)

In [None]:
df2.describe().show()

In [None]:
long_flight=df2.filter("distance>1000")
long_flight.show()

In [None]:
long_flight2 = df2.filter(df2.distance > 1000 )
long_flight2.show()

In [None]:
df2.show()

In [None]:
selected_1 = df2.select('tailnum', 'origin', 'dest')
selected_1.show()

In [None]:
temp = df2.select(df2.origin,df2.dest,df2.carrier)
temp.show()

In [None]:
filterA = df2.origin == 'SEA'
filterB =df2.dest == 'PDX'


In [None]:
selected_2 = temp.filter(filterA).filter(filterB)
selected_2.show()

In [None]:
avg_speed = (df2.distance/(df2.air_time/60)).alias("avg_speed")
speed_1 = df2.select('origin','dest','tailnum', avg_speed)
speed_1.show()

In [None]:
speed_2 =df2.selectExpr('origin','dest','tailnum','distance/(air_time/60) as avg_speed')
speed_2.show()

In [None]:
df2 = df2.withColumn('distance', df2.distance.cast('float'))
df2 = df2.withColumn('air_time', df2.air_time.cast('float'))

df2.describe('air_time', 'distance').show()

In [None]:
df2.show()

In [None]:
df2.filter(df2.origin =='PDX').groupBy().min('distance').show()

In [None]:
df2.filter(df2.origin == 'SEA').groupBy().max('air_time').show()

In [None]:
df2.filter(df2.carrier == 'DL').filter(df2.origin == 'SEA').groupBy().avg('air_time').show()

In [None]:
df2.withColumn('duration_hrs', df2.air_time/60).groupBy().sum('duration_hrs').show()

In [None]:
by_plane = df2.groupBy('tailnum')
by_plane.count().show()

In [None]:
by_origin = df2.groupBy('origin')
by_origin.avg('air_time').show()

In [None]:
df2 = df2.withColumn('dep_delay',df2.dep_delay.cast('float'))
df2.show()

In [None]:
by_month_dest = df2.groupBy('month', 'dest')
by_month_dest.avg('dep_delay').show()

In [None]:
from pyspark.sql.types import NumericType
non_numeric_cols = [field.name for field in df2.schema.fields if not isinstance(field.dataType, NumericType)]

In [None]:
print("Non-numeric columns:", non_numeric_cols)

In [None]:
df= df.withColumnRenamed('faa','dest')
df.show()

In [None]:
df3=df2.join(df, on='dest', how='leftouter')
df3.show()

In [None]:
df4=spark.read.csv("/kaggle/input/datasets-for-pyspark-project/planes.csv",header=True)
df4.show()

In [None]:
df4=df4.withColumnRenamed('year', 'plane_year')

In [None]:
df5 = df2.join(df4, on='tailnum', how='leftouter')

In [None]:
df5.show()

In [None]:
df5.describe().show()

In [None]:
df5.show()

In [None]:
string_value= [field.name for field in df5.schema.fields if not isinstance(field.dataType, NumericType)]

In [None]:
string_value

In [None]:
columns_to_drop = ["year","month","day",'tailnum', 'carrier', 'origin', 'dest', 'type', 'manufacturer', 'model', 'engine']

In [None]:
df5 = df5.drop(*columns_to_drop)

In [None]:
df5.show()

In [None]:
df5.printSchema()

In [None]:
from pyspark.sql.functions import col,isnan, when, count


In [None]:
from pyspark.sql.functions import mean, col, when

# Define columns to process (only numeric ones)
numeric_columns = ["dep_delay", "air_time", "duration_hrs"]

# Compute mean values for numeric columns
mean_values = df5.select([mean(c).alias(c) for c in numeric_columns]).collect()[0]

# Replace nulls with the mean value for each numeric column
for c in numeric_columns:
    df5 = df5.withColumn(c, when(col(c).isNull(), mean_values[c]).otherwise(col(c)))

df5.show()


In [None]:
string_columns = ["arr_time", "arr_delay", "flight", "hour", "minute", "plane_year", "engines", "seats", "speed"]

df5 = df5.na.fill("unknown", subset=string_columns)


In [None]:
# Define the list of columns where null values need to be replaced
null_col = ["dep_delay", "air_time", "duration_hrs", "plane_year", "engines", "seats", "speed"]

# Replace null values with 0 for the specified columns
df5 = df5.na.fill(value=0, subset=null_col)

# Show the updated DataFrame
df5.show()


In [None]:
df5.printSchema()

In [None]:
from pyspark.sql.functions import col, when

# List of columns to cast to numeric
columns_to_cast = ["dep_time", "arr_time", "arr_delay", "flight", "hour", "minute", "plane_year", "engines", "seats", "speed"]

# Fill non-numeric values in string columns with "0" and cast them to double
for c in columns_to_cast:
    df5 = df5.withColumn(c, when(col(c).isNull(), "0").otherwise(col(c)))  # Replace nulls with "0"
    df5 = df5.withColumn(c, col(c).cast("double"))  # Cast to double

# Cast remaining numeric columns explicitly if needed
numeric_columns = ["dep_delay", "air_time", "distance", "duration_hrs"]
for c in numeric_columns:
    df5 = df5.withColumn(c, col(c).cast("double"))  # Ensure they are double

# Check the schema after conversion
df5.printSchema()

# Show the updated DataFrame
df5.show()


In [None]:
# Fill all null values with 0
df5 = df5.na.fill(0)

# Show the DataFrame to verify
df5.show()


In [None]:
from pyspark.sql.functions import col, isnan, when, count

df5.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df5.columns]).show()


In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
features = ["dep_time", "arr_time", "air_time", "distance", "hour", "minute", 
            "duration_hrs", "plane_year", "engines", "seats", "speed"]
target = "dep_delay"

In [None]:
assembler = VectorAssembler(inputCols=features, outputCol="features")
df5 = assembler.transform(df5)

In [None]:
df5 = df5.select("features", target)
df5 = df5.dropna()

In [None]:
df5.show()

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
train_data, test_data = df5.randomSplit([0.8, 0.2], seed=42)
print(f"Training data count: {train_data.count()}")
print(f"Testing data count: {test_data.count()}")

In [None]:
lr = LinearRegression(featuresCol="features", labelCol=target)
lr_model = lr.fit(train_data)

In [None]:
print(f"Coefficients: {lr_model.coefficients}")
print(f"Intercept: {lr_model.intercept}")

In [None]:
predictions = lr_model.transform(test_data)
predictions.select("features", target, "prediction").show(5)

In [None]:
evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

In [None]:
r2 = evaluator.setMetricName("r2").evaluate(predictions)
print(f"R2 score: {r2}")