In [0]:
# Imports
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, lit, col, to_timestamp, unix_timestamp, current_timestamp
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import min as spark_min, max as spark_max
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType

In [0]:
raw_df = spark.table("yellow_trip_raw")

In [0]:


df_unique = raw_df.dropDuplicates() # nao tem duplicadas eu ja verifiquei
df = df_unique.withColumn("_id", monotonically_increasing_id())

#timestamp

df = df.withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")).withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))

df = df.withColumn("processing_date", current_timestamp())

#range data correta
df_invalid_date = df.filter(
    (col("tpep_pickup_datetime") < lit("2023-01-01")) &
    (col("tpep_pickup_datetime") > lit("2023-04-30"))
)
df = df.subtract(df_invalid_date)

# retirando itens com horas inconsistentes | tirar viagens com menos de 1 minuto 
df_inconsistent_time = df.filter(col("tpep_pickup_datetime") > col("tpep_dropoff_datetime"))
#df = df.filter(col("tpep_pickup_datetime") <= col("tpep_dropoff_datetime"))
df = df.subtract(df_inconsistent_time)

df = df.withColumn("pickup_seconds", unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))).filter(col("pickup_seconds") >= 60)
df_invalid_duration = df.filter(col("pickup_seconds") < 60)

# Filtro validando se algum sai da delimitacao das cidades estabelecidas
df = df.filter("PULocationID >= 1 AND PULocationID <= 265 AND DOLocationID >= 1 AND DOLocationID <= 265")
df_invalid_location = df.filter("PULocationID < 1 OR PULocationID > 265 OR DOLocationID < 1 OR DOLocationID > 265")



# Filtro passageiros (nao pode ser 0 ou maior igual a 5)
df = df.filter((col("passenger_count") >= 1) & (col("passenger_count") <= 4))
df_invalid_passengers = df.filter((col("passenger_count") < 1) | (col("passenger_count") > 4))

# trip distance nao pode ser 0
df = df.filter(col("trip_distance") > 0)
df_invalid_distance = df.filter(col("trip_distance") <= 0)

# fire_amount nao pode ser 0
df = df.filter(col("fare_amount") > 0)
df_invalid_fare = df.filter(col("fare_amount") <= 0)

#payment_type no range estabelecido
df = df.filter((col("payment_type") >= 1) & (col("payment_type") <= 6))
df_invalid_payment = df.filter((col("payment_type") < 1) | (col("payment_type") > 6))

# vendorID == nulo 
df = df.filter(col("VendorID").isNotNull() & (col("VendorID").isin([1,2,6,7])))
df_invalid_vendor = df.filter(col("VendorID").isNull() | (~col("VendorID").isin([1,2,6,7])))

df = df.withColumn(
    "is_outlier",
    (col("trip_distance") > 100) | (col("fare_amount") > 500) | (col("fare_amount")/col("trip_distance") < 0.5)
)

# Invalid Data
df_invalid = df_invalid_date.unionByName(df_inconsistent_time, allowMissingColumns=True).unionByName(df_invalid_location, allowMissingColumns=True).unionByName(df_invalid_passengers, allowMissingColumns=True).unionByName(df_invalid_distance, allowMissingColumns=True).unionByName(df_invalid_fare, allowMissingColumns=True).unionByName(df_invalid_payment, allowMissingColumns=True).unionByName(df_invalid_vendor, allowMissingColumns=True)

df_invalid.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("yellow_trip_silver_invalid")


df.filter(col("is_outlier") == True)

df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("yellow_trip_silver")