In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DateType

In [2]:
# Create SparkSession
spark = (SparkSession
         .builder
         .master('local[*]')
         .appName('ReadFiles')
         .getOrCreate())

24/03/01 17:50:04 WARN Utils: Your hostname, MacBook-Pro-de-Mihai.local resolves to a loopback address: 127.0.0.1; using 192.168.18.250 instead (on interface en0)
24/03/01 17:50:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/01 17:50:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Create a schema for the CSV file
schema = StructType([StructField("TotalCost", IntegerType(), nullable=True),
                     StructField("BirthDate", DateType(), nullable=True),
                     StructField("Gender", StringType(), nullable=True),
                     StructField("TotalChildren", IntegerType(), nullable=True),
                     StructField("ProductCategoryName", StringType(), nullable=True)])

In [36]:
spark.read.parquet('../data/csv/random/input.parquet').show()

+------+---+------+-----+-------------------+-----------+
|  size|age|  team|  win|               date|       prob|
+------+---+------+-----+-------------------+-----------+
|   big| 33|yellow| true|2022-06-15 00:00:00|   0.487826|
|medium| 15|yellow|false|2022-11-28 00:00:00|  0.5943772|
|   big| 29|  blue| true|2022-10-10 00:00:00|0.008020305|
|medium| 15| green| true|2022-10-02 00:00:00|  0.4362723|
|   big|  1|  blue| true|2020-09-29 00:00:00|   0.327922|
| small| 33|  blue| true|2020-05-23 00:00:00| 0.32705152|
| small| 22|yellow| true|2020-11-01 00:00:00|  0.3758683|
|   big| 39| green|false|2022-07-04 00:00:00|  0.5921349|
|   big| 36|   red| true|2021-10-12 00:00:00| 0.18010604|
| small| 15|  blue|false|2020-10-16 00:00:00|  0.5274084|
| small| 21|   red| true|2020-05-03 00:00:00|  0.7999763|
| small| 11|  blue| true|2020-11-14 00:00:00|  0.9940903|
| small| 38|yellow| true|2020-10-27 00:00:00|  0.9335122|
|medium| 38|yellow|false|2022-03-30 00:00:00| 0.42175278|
| small|  5|ye

### CSV files

In [9]:
# Read a CSV file specifying schema, that way Spark infer the schema by default if not set no False
df = (spark.read
      .option("header", "true")
      .option("delimiter", "|")
      .schema(schema)
      .csv('../data/csv/stream.csv'))

In [10]:
df.printSchema()

root
 |-- TotalCost: integer (nullable = true)
 |-- BirthDate: date (nullable = true)
 |-- Gender: string (nullable = true)
 |-- TotalChildren: integer (nullable = true)
 |-- ProductCategoryName: string (nullable = true)


In [11]:
df.show(2)

+---------+----------+------+-------------+-------------------+
|TotalCost| BirthDate|Gender|TotalChildren|ProductCategoryName|
+---------+----------+------+-------------+-------------------+
|     1000|      null|  Male|            2|         Technology|
|     2000|1957-03-06|  null|            3|             Beauty|
+---------+----------+------+-------------+-------------------+


In [17]:
# If schema is not specified, spark read all cols as string by default
df = (spark.read
      .option("header", "true")
      .option("delimiter", "|")
      # .option("inferSchema", "true") # That option can be given to Spark to infer schema. Better way is to define a schema, this can be  wrong sometimes
      .csv('../data/csv/stream.csv'))

In [34]:
# Options ca be also be specified by dict:
options = {'header': 'true', 'delimiter': '|'}
spark.read.options(**options).csv('../data/csv/stream.csv').show(2)

+---------+----------+------+-------------+-------------------+
|TotalCost| BirthDate|Gender|TotalChildren|ProductCategoryName|
+---------+----------+------+-------------+-------------------+
|     1000|      null|  Male|            2|         Technology|
|     2000|1957-03-06|  null|            3|             Beauty|
+---------+----------+------+-------------+-------------------+


In [18]:
df.printSchema()

root
 |-- TotalCost: string (nullable = true)
 |-- BirthDate: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- TotalChildren: string (nullable = true)
 |-- ProductCategoryName: string (nullable = true)


In [20]:
df.show(2)

+---------+----------+------+-------------+-------------------+
|TotalCost| BirthDate|Gender|TotalChildren|ProductCategoryName|
+---------+----------+------+-------------+-------------------+
|     1000|      null|  Male|            2|         Technology|
|     2000|1957-03-06|  null|            3|             Beauty|
+---------+----------+------+-------------+-------------------+


### JSON files

In [28]:
# Read simple JSON
df = (spark.read
      .json('../data/json/zipcodes.json')) # Read one json per line

In [29]:
df.printSchema()

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- EstimatedPopulation: long (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Long: double (nullable = true)
 |-- Notes: string (nullable = true)
 |-- RecordNumber: long (nullable = true)
 |-- State: string (nullable = true)
 |-- TaxReturnsFiled: long (nullable = true)
 |-- TotalWages: long (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Xaxis: double (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- Zipcode: long (nullable = true)


In [30]:
df.show(2)

+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+------+-----+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|               City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|  Long|Notes|RecordNumber|State|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|
+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+------+-----+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|        PARC PARQUE|     US|        false|               null|17.96|NA-US-PR-PARC PARQUE|     Parc Parque, PR|NOT ACCEPTABLE|-66.22| null|           1|   PR|           null|      null|         NA| 0.38|-0.87|  0.3|   STANDARD|    704|
|PASEO COSTA DEL SUR|     US|        false|             

In [24]:
df = (spark.read
      .option('multiline', value=True) # This way can read a array of json of type [{...},...,{...}]
      .json('../data/json/multiline-zipcode.json'))
# JSON also can be read with a specific schema ading option: .schema(schema = json_schema)

In [25]:
df.printSchema()

root
 |-- City: string (nullable = true)
 |-- RecordNumber: long (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- Zipcode: long (nullable = true)


In [26]:
df.show(2)

+-------------------+------------+-----+-----------+-------+
|               City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|
|       BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+-------------------+------------+-----+-----------+-------+
