In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.master("local[*]")\
                    .appName("read_csv2")\
                    .getOrCreate()

In [2]:
my_schema = StructType(
                    [
                        StructField("DESTINATION_COUNTRY_NAME",StringType(), True),
                        StructField("ORIGIN_COUNTRY_NAME",StringType(), True),
                        StructField("COUNT",IntegerType(),True)   
                    ]
                )


In [3]:
# MODE : PERMISSIVE (fills missing values with null)
from pyspark.sql.functions import monotonically_increasing_id
flight_df = spark.read.format("csv") \
                .option("header", "false") \
                .schema(my_schema)\
                .option("inferSchema", "false") \
                .option("mode","PERMISSIVE")\
                .load("flight_data.csv")

# to remove first n rows by adding an extra column using row_number
flight_df = flight_df.withColumn("row_id", monotonically_increasing_id())
flight_df = flight_df.filter(flight_df.row_id >= 1).drop("row_id")
                

In [4]:
flight_df.show(5)

+------------------------+-------------------+-----+
|DESTINATION_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|COUNT|
+------------------------+-------------------+-----+
|           United States|            Romania|    1|
|           United States|            Ireland|  264|
|           United States|              India|   69|
|                   Egypt|      United States|   24|
|       Equatorial Guinea|      United States|    1|
+------------------------+-------------------+-----+
only showing top 5 rows



In [5]:
# MODE : DROPMALFORMED (drops rows that are malformed)
flight_df = spark.read.format("csv")\
                .option("header","false")\
                .option("skipFirstRows",1)\
                .option("infershema","false")\
                .schema(my_schema)\
                .option("mode","DROPMALFORMED")\
                .load("flight_data.csv")

In [12]:
# MODE : FAILFAST (default mode, fails immediately when got malformed records or missing schema)
flight_df2 = spark.read.format("csv") \
                .option("header", "false") \
                .option("inferSchema", "false") \
                .option("mode","FAILFAST")\
                .load("flight_data.csv")


In [13]:
flight_df2.show(5)

+-----------------+-------------------+-----+
|              _c0|                _c1|  _c2|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
+-----------------+-------------------+-----+
only showing top 5 rows

