In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
import time

nyc_schema = StructType([
  StructField('Vendor', StringType(), True),
  StructField('Pickup_DateTime', TimestampType(), True),
  StructField('Dropoff_DateTime', TimestampType(), True),
  StructField('Passenger_Count', IntegerType(), True),
  StructField('Trip_Distance', DoubleType(), True),
  StructField('Pickup_Longitude', DoubleType(), True),
  StructField('Pickup_Latitude', DoubleType(), True),
  StructField('Rate_Code', StringType(), True),
  StructField('Store_And_Forward', StringType(), True),
  StructField('Dropoff_Longitude', DoubleType(), True),
  StructField('Dropoff_Latitude', DoubleType(), True),
  StructField('Payment_Type', StringType(), True),
  StructField('Fare_Amount', DoubleType(), True),
  StructField('Surcharge', DoubleType(), True),
  StructField('MTA_Tax', DoubleType(), True),
  StructField('Tip_Amount', DoubleType(), True),
  StructField('Tolls_Amount', DoubleType(), True),
  StructField('Total_Amount', DoubleType(), True)

])
path = "dbfs:/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-12.csv.gz"
df = spark.read.format('csv').options(header=True).schema(nyc_schema).load(path)

df_destroyed = df.withColumn("Passenger_Count", when( (col("Passenger_Count") == 0), "Zero").otherwise(col("Passenger_Count") ))
display(df_destroyed)

df_first_1000 = df_destroyed.limit(1000)
df_first_1000.write.csv("dbfs:/tmp/destroyed_file", sep=";", mode="overwrite")
dpath = "dbfs:/tmp/destroyed_file"

Vendor,Pickup_DateTime,Dropoff_DateTime,Passenger_Count,Trip_Distance,Pickup_Longitude,Pickup_Latitude,Rate_Code,Store_And_Forward,Dropoff_Longitude,Dropoff_Latitude,Payment_Type,Fare_Amount,Surcharge,MTA_Tax,Tip_Amount,Tolls_Amount,Total_Amount
1,2019-12-01T00:26:58.000+0000,2019-12-01T00:41:45.000+0000,1,4.2,1.0,,142,116,2.0,14.5,3.0,0.5,0.0,0.0,0.3,18.3,2.5
1,2019-12-01T00:12:08.000+0000,2019-12-01T00:12:14.000+0000,1,0.0,1.0,,145,145,2.0,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
1,2019-12-01T00:25:53.000+0000,2019-12-01T00:26:04.000+0000,1,0.0,1.0,,145,145,2.0,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
1,2019-12-01T00:12:03.000+0000,2019-12-01T00:33:19.000+0000,2,9.4,1.0,,138,25,1.0,28.5,0.5,0.5,10.0,0.0,0.3,39.8,0.0
1,2019-12-01T00:05:27.000+0000,2019-12-01T00:16:32.000+0000,2,1.6,1.0,,161,237,2.0,9.0,3.0,0.5,0.0,0.0,0.3,12.8,2.5
1,2019-12-01T00:58:51.000+0000,2019-12-01T01:08:37.000+0000,2,1.0,1.0,,161,230,2.0,6.5,3.0,0.5,0.0,0.0,0.3,10.3,2.5
1,2019-12-01T00:14:19.000+0000,2019-12-01T00:27:06.000+0000,Zero,1.7,1.0,,164,163,2.0,10.0,3.0,0.5,0.0,0.0,0.3,13.8,2.5
1,2019-12-01T00:29:35.000+0000,2019-12-01T00:32:29.000+0000,Zero,0.5,1.0,,79,224,1.0,4.0,3.0,0.5,1.55,0.0,0.3,9.35,2.5
1,2019-12-01T00:42:19.000+0000,2019-12-01T00:50:34.000+0000,Zero,1.4,1.0,,79,107,2.0,7.5,3.0,0.5,0.0,0.0,0.3,11.3,2.5
1,2019-12-01T00:19:48.000+0000,2019-12-01T00:24:18.000+0000,1,0.9,1.0,,148,4,1.0,5.5,3.0,0.5,1.85,0.0,0.3,11.15,2.5


In [0]:
df_permissive = spark.read.format("csv").option("header", True).schema(nyc_schema).option("mode", "PERMISSIVE").csv(dpath, sep=";")
print("PERMISSIVE MODE:")
display(df_permissive.limit(20))

PERMISSIVE MODE:


Vendor,Pickup_DateTime,Dropoff_DateTime,Passenger_Count,Trip_Distance,Pickup_Longitude,Pickup_Latitude,Rate_Code,Store_And_Forward,Dropoff_Longitude,Dropoff_Latitude,Payment_Type,Fare_Amount,Surcharge,MTA_Tax,Tip_Amount,Tolls_Amount,Total_Amount
1,2019-12-01T00:12:08.000+0000,2019-12-01T00:12:14.000+0000,1.0,0.0,1.0,,145,145,2.0,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
1,2019-12-01T00:25:53.000+0000,2019-12-01T00:26:04.000+0000,1.0,0.0,1.0,,145,145,2.0,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
1,2019-12-01T00:12:03.000+0000,2019-12-01T00:33:19.000+0000,2.0,9.4,1.0,,138,25,1.0,28.5,0.5,0.5,10.0,0.0,0.3,39.8,0.0
1,2019-12-01T00:05:27.000+0000,2019-12-01T00:16:32.000+0000,2.0,1.6,1.0,,161,237,2.0,9.0,3.0,0.5,0.0,0.0,0.3,12.8,2.5
1,2019-12-01T00:58:51.000+0000,2019-12-01T01:08:37.000+0000,2.0,1.0,1.0,,161,230,2.0,6.5,3.0,0.5,0.0,0.0,0.3,10.3,2.5
1,2019-12-01T00:14:19.000+0000,2019-12-01T00:27:06.000+0000,,1.7,1.0,,164,163,2.0,10.0,3.0,0.5,0.0,0.0,0.3,13.8,2.5
1,2019-12-01T00:29:35.000+0000,2019-12-01T00:32:29.000+0000,,0.5,1.0,,79,224,1.0,4.0,3.0,0.5,1.55,0.0,0.3,9.35,2.5
1,2019-12-01T00:42:19.000+0000,2019-12-01T00:50:34.000+0000,,1.4,1.0,,79,107,2.0,7.5,3.0,0.5,0.0,0.0,0.3,11.3,2.5
1,2019-12-01T00:19:48.000+0000,2019-12-01T00:24:18.000+0000,1.0,0.9,1.0,,148,4,1.0,5.5,3.0,0.5,1.85,0.0,0.3,11.15,2.5
1,2019-12-01T00:36:16.000+0000,2019-12-01T00:53:42.000+0000,3.0,5.5,1.0,,79,226,1.0,18.0,3.0,0.5,4.35,0.0,0.3,26.15,2.5


In [0]:
df_permissive = spark.read.format("csv").option("header", True).schema(nyc_schema).option("dropMalformed", "PERMISSIVE").csv(dpath, sep=";")
print("DROP MALFORMED:")
display(df_permissive.limit(20))

DROP MALFORMED:


Vendor,Pickup_DateTime,Dropoff_DateTime,Passenger_Count,Trip_Distance,Pickup_Longitude,Pickup_Latitude,Rate_Code,Store_And_Forward,Dropoff_Longitude,Dropoff_Latitude,Payment_Type,Fare_Amount,Surcharge,MTA_Tax,Tip_Amount,Tolls_Amount,Total_Amount
1,2019-12-01T00:12:08.000+0000,2019-12-01T00:12:14.000+0000,1.0,0.0,1.0,,145,145,2.0,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
1,2019-12-01T00:25:53.000+0000,2019-12-01T00:26:04.000+0000,1.0,0.0,1.0,,145,145,2.0,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
1,2019-12-01T00:12:03.000+0000,2019-12-01T00:33:19.000+0000,2.0,9.4,1.0,,138,25,1.0,28.5,0.5,0.5,10.0,0.0,0.3,39.8,0.0
1,2019-12-01T00:05:27.000+0000,2019-12-01T00:16:32.000+0000,2.0,1.6,1.0,,161,237,2.0,9.0,3.0,0.5,0.0,0.0,0.3,12.8,2.5
1,2019-12-01T00:58:51.000+0000,2019-12-01T01:08:37.000+0000,2.0,1.0,1.0,,161,230,2.0,6.5,3.0,0.5,0.0,0.0,0.3,10.3,2.5
1,2019-12-01T00:14:19.000+0000,2019-12-01T00:27:06.000+0000,,1.7,1.0,,164,163,2.0,10.0,3.0,0.5,0.0,0.0,0.3,13.8,2.5
1,2019-12-01T00:29:35.000+0000,2019-12-01T00:32:29.000+0000,,0.5,1.0,,79,224,1.0,4.0,3.0,0.5,1.55,0.0,0.3,9.35,2.5
1,2019-12-01T00:42:19.000+0000,2019-12-01T00:50:34.000+0000,,1.4,1.0,,79,107,2.0,7.5,3.0,0.5,0.0,0.0,0.3,11.3,2.5
1,2019-12-01T00:19:48.000+0000,2019-12-01T00:24:18.000+0000,1.0,0.9,1.0,,148,4,1.0,5.5,3.0,0.5,1.85,0.0,0.3,11.15,2.5
1,2019-12-01T00:36:16.000+0000,2019-12-01T00:53:42.000+0000,3.0,5.5,1.0,,79,226,1.0,18.0,3.0,0.5,4.35,0.0,0.3,26.15,2.5


In [0]:
df_permissive = spark.read.format("csv").option("header", True).schema(nyc_schema).option("failFast", "PERMISSIVE").csv(dpath, sep=";")
print("FAIL FAST:")
display(df_permissive.limit(20))

FAIL FAST:


Vendor,Pickup_DateTime,Dropoff_DateTime,Passenger_Count,Trip_Distance,Pickup_Longitude,Pickup_Latitude,Rate_Code,Store_And_Forward,Dropoff_Longitude,Dropoff_Latitude,Payment_Type,Fare_Amount,Surcharge,MTA_Tax,Tip_Amount,Tolls_Amount,Total_Amount
1,2019-12-01T00:12:08.000+0000,2019-12-01T00:12:14.000+0000,1.0,0.0,1.0,,145,145,2.0,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
1,2019-12-01T00:25:53.000+0000,2019-12-01T00:26:04.000+0000,1.0,0.0,1.0,,145,145,2.0,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
1,2019-12-01T00:12:03.000+0000,2019-12-01T00:33:19.000+0000,2.0,9.4,1.0,,138,25,1.0,28.5,0.5,0.5,10.0,0.0,0.3,39.8,0.0
1,2019-12-01T00:05:27.000+0000,2019-12-01T00:16:32.000+0000,2.0,1.6,1.0,,161,237,2.0,9.0,3.0,0.5,0.0,0.0,0.3,12.8,2.5
1,2019-12-01T00:58:51.000+0000,2019-12-01T01:08:37.000+0000,2.0,1.0,1.0,,161,230,2.0,6.5,3.0,0.5,0.0,0.0,0.3,10.3,2.5
1,2019-12-01T00:14:19.000+0000,2019-12-01T00:27:06.000+0000,,1.7,1.0,,164,163,2.0,10.0,3.0,0.5,0.0,0.0,0.3,13.8,2.5
1,2019-12-01T00:29:35.000+0000,2019-12-01T00:32:29.000+0000,,0.5,1.0,,79,224,1.0,4.0,3.0,0.5,1.55,0.0,0.3,9.35,2.5
1,2019-12-01T00:42:19.000+0000,2019-12-01T00:50:34.000+0000,,1.4,1.0,,79,107,2.0,7.5,3.0,0.5,0.0,0.0,0.3,11.3,2.5
1,2019-12-01T00:19:48.000+0000,2019-12-01T00:24:18.000+0000,1.0,0.9,1.0,,148,4,1.0,5.5,3.0,0.5,1.85,0.0,0.3,11.15,2.5
1,2019-12-01T00:36:16.000+0000,2019-12-01T00:53:42.000+0000,3.0,5.5,1.0,,79,226,1.0,18.0,3.0,0.5,4.35,0.0,0.3,26.15,2.5


In [0]:
df_first_1000.write.parquet("dbfs:/tmp/df.parquet", mode="overwrite")
df_first_1000.write.parquet("dbfs:/tmp/df.json", mode="overwrite")
display(spark.read.parquet("/tmp/df.parquet").limit(20))
display(spark.read.parquet("/tmp/df.json").limit(20))

Vendor,Pickup_DateTime,Dropoff_DateTime,Passenger_Count,Trip_Distance,Pickup_Longitude,Pickup_Latitude,Rate_Code,Store_And_Forward,Dropoff_Longitude,Dropoff_Latitude,Payment_Type,Fare_Amount,Surcharge,MTA_Tax,Tip_Amount,Tolls_Amount,Total_Amount
1,2019-12-01T00:26:58.000+0000,2019-12-01T00:41:45.000+0000,1,4.2,1.0,,142,116,2.0,14.5,3.0,0.5,0.0,0.0,0.3,18.3,2.5
1,2019-12-01T00:12:08.000+0000,2019-12-01T00:12:14.000+0000,1,0.0,1.0,,145,145,2.0,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
1,2019-12-01T00:25:53.000+0000,2019-12-01T00:26:04.000+0000,1,0.0,1.0,,145,145,2.0,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
1,2019-12-01T00:12:03.000+0000,2019-12-01T00:33:19.000+0000,2,9.4,1.0,,138,25,1.0,28.5,0.5,0.5,10.0,0.0,0.3,39.8,0.0
1,2019-12-01T00:05:27.000+0000,2019-12-01T00:16:32.000+0000,2,1.6,1.0,,161,237,2.0,9.0,3.0,0.5,0.0,0.0,0.3,12.8,2.5
1,2019-12-01T00:58:51.000+0000,2019-12-01T01:08:37.000+0000,2,1.0,1.0,,161,230,2.0,6.5,3.0,0.5,0.0,0.0,0.3,10.3,2.5
1,2019-12-01T00:14:19.000+0000,2019-12-01T00:27:06.000+0000,Zero,1.7,1.0,,164,163,2.0,10.0,3.0,0.5,0.0,0.0,0.3,13.8,2.5
1,2019-12-01T00:29:35.000+0000,2019-12-01T00:32:29.000+0000,Zero,0.5,1.0,,79,224,1.0,4.0,3.0,0.5,1.55,0.0,0.3,9.35,2.5
1,2019-12-01T00:42:19.000+0000,2019-12-01T00:50:34.000+0000,Zero,1.4,1.0,,79,107,2.0,7.5,3.0,0.5,0.0,0.0,0.3,11.3,2.5
1,2019-12-01T00:19:48.000+0000,2019-12-01T00:24:18.000+0000,1,0.9,1.0,,148,4,1.0,5.5,3.0,0.5,1.85,0.0,0.3,11.15,2.5


Vendor,Pickup_DateTime,Dropoff_DateTime,Passenger_Count,Trip_Distance,Pickup_Longitude,Pickup_Latitude,Rate_Code,Store_And_Forward,Dropoff_Longitude,Dropoff_Latitude,Payment_Type,Fare_Amount,Surcharge,MTA_Tax,Tip_Amount,Tolls_Amount,Total_Amount
1,2019-12-01T00:26:58.000+0000,2019-12-01T00:41:45.000+0000,1,4.2,1.0,,142,116,2.0,14.5,3.0,0.5,0.0,0.0,0.3,18.3,2.5
1,2019-12-01T00:12:08.000+0000,2019-12-01T00:12:14.000+0000,1,0.0,1.0,,145,145,2.0,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
1,2019-12-01T00:25:53.000+0000,2019-12-01T00:26:04.000+0000,1,0.0,1.0,,145,145,2.0,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
1,2019-12-01T00:12:03.000+0000,2019-12-01T00:33:19.000+0000,2,9.4,1.0,,138,25,1.0,28.5,0.5,0.5,10.0,0.0,0.3,39.8,0.0
1,2019-12-01T00:05:27.000+0000,2019-12-01T00:16:32.000+0000,2,1.6,1.0,,161,237,2.0,9.0,3.0,0.5,0.0,0.0,0.3,12.8,2.5
1,2019-12-01T00:58:51.000+0000,2019-12-01T01:08:37.000+0000,2,1.0,1.0,,161,230,2.0,6.5,3.0,0.5,0.0,0.0,0.3,10.3,2.5
1,2019-12-01T00:14:19.000+0000,2019-12-01T00:27:06.000+0000,Zero,1.7,1.0,,164,163,2.0,10.0,3.0,0.5,0.0,0.0,0.3,13.8,2.5
1,2019-12-01T00:29:35.000+0000,2019-12-01T00:32:29.000+0000,Zero,0.5,1.0,,79,224,1.0,4.0,3.0,0.5,1.55,0.0,0.3,9.35,2.5
1,2019-12-01T00:42:19.000+0000,2019-12-01T00:50:34.000+0000,Zero,1.4,1.0,,79,107,2.0,7.5,3.0,0.5,0.0,0.0,0.3,11.3,2.5
1,2019-12-01T00:19:48.000+0000,2019-12-01T00:24:18.000+0000,1,0.9,1.0,,148,4,1.0,5.5,3.0,0.5,1.85,0.0,0.3,11.15,2.5
