# Data-Source to Data-Frame with pyspark

## Setup

In [1]:
import pyspark
from pyspark.sql import SparkSession

session = SparkSession.builder.appName('Ds to Df').getOrCreate()

In [2]:
cars_df = session.read.load(
    '../spark_files/cars.csv', 
    format='csv', 
    header=True, 
    inferschema=True)
# 👀 load method necessary to transcribe ds to df


## Important Methods & Attributes

### .printSchema()

In [3]:
cars_df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Displacement: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Acceleration: double (nullable = true)
 |-- Model: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- city: string (nullable = true)



### .columns

In [4]:
cars_df.columns  # returns list

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin',
 'quantity',
 'city']

In [8]:
len(cars_df.columns)

11

### .head()

In [6]:
print(cars_df.head()) # head takes N number of rows
print("")
print(type(cars_df.head()))  # returns a 'row' type

Row(Car='AMC Ambassador Brougham', MPG=13.0, Cylinders=8, Displacement=360.0, Horsepower=175, Weight=3821, Acceleration=11.0, Model=73, Origin='US', quantity=25, city='NewYork')

<class 'pyspark.sql.types.Row'>


### .tail()

In [7]:
cars_df.tail(2)  # N number of rows, just like heads

[Row(Car='Volvo 264gl', MPG=17.0, Cylinders=6, Displacement=163.0, Horsepower=125, Weight=3140, Acceleration=13.6, Model=78, Origin='Europe', quantity=320, city='NewYork'),
 Row(Car='Volvo Diesel', MPG=30.7, Cylinders=6, Displacement=145.0, Horsepower=76, Weight=3160, Acceleration=19.6, Model=81, Origin='Europe', quantity=406, city='NJ')]

### .describe() & .show() ⭐

In [10]:
cars_df.describe('Car').show()
# w/o .show() cars_df.describe('') will just return a dataframe object

+-------+--------------------+
|summary|                 Car|
+-------+--------------------+
|  count|                 406|
|   mean|                null|
| stddev|                null|
|    min|AMC Ambassador Br...|
|    max|        Volvo Diesel|
+-------+--------------------+



### .orderBy()

In [11]:
cars_df.orderBy(cars_df['Acceleration'].desc()).show(10)
# NOTE 
# Pylint does NOT recognize .desc() as a method 
# Columns can also be recognized by dot notation (cars_df.Acceleration)


+--------------------+----+---------+------------+----------+------+------------+-----+------+--------+-------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|quantity|   city|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------+-------+
|         Peugeot 504|27.2|        4|       141.0|        71|  3190|        24.8|   79|Europe|     344|NewYork|
|   Volkswagen Pickup|44.0|        4|        97.0|        52|  2130|        24.6|   82|Europe|      96|NewYork|
|Volkswagen Dasher...|43.4|        4|        90.0|        48|  2335|        23.7|   80|Europe|     371| DALLAS|
|   Volkswagen Type 3|23.0|        4|        97.0|        54|  2254|        23.5|   72|Europe|     104|NewYork|
|  Chevrolet Chevette|29.0|        4|        85.0|        52|  2035|        22.2|   76|    US|     240|  TEXAS|
|Oldsmobile Cutlas...|23.9|        8|       260.0|        90|  3420|        22.2|   79|    US|     345|N

## Creating a Dataframe from a JSON Data Source

In [17]:
zipcode_df = session.read.load('../spark_files/zipcode.json', format='json')
zipcode_df.printSchema()
# NOTE
# can also be accessed w/ session.read.json('')

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- EstimatedPopulation: long (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Long: double (nullable = true)
 |-- Notes: string (nullable = true)
 |-- RecordNumber: long (nullable = true)
 |-- State: string (nullable = true)
 |-- TaxReturnsFiled: long (nullable = true)
 |-- TotalWages: long (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Xaxis: double (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- Zipcode: long (nullable = true)



In [19]:
zipcode_df.show(10)  # 👀 kind of a lot of headers

+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|               City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|   Long|        Notes|RecordNumber|State|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|
+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|        PARC PARQUE|     US|        false|               null|17.96|NA-US-PR-PARC PARQUE|     Parc Parque, PR|NOT ACCEPTABLE| -66.22|         null|           1|   PR|           null|      null|         NA| 0.38|-0.87|  0.3|   STANDARD|    704|
|PASEO COSTA DEL SUR

### Multi-Line JSON 

In [21]:
multiline_df = session.read.option("multiline", "true").json('../spark_files/multiline-zipcode.json')
# note the additional kwargs passed through a seperate method, and are text-based

In [22]:
multiline_df.show()

+-------------------+------------+-----+-----------+-------+
|               City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|
|       BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+-------------------+------------+-----+-----------+-------+



### Reading *multiple* JSON files into a single pysql dataframe

#### Specifying with a list

In [23]:
multiple_df = session.read.json(['../spark_files/zipcode.json',  # passing list
                                 '../spark_files/zipcode1.json'])
multiline_df.show()

+-------------------+------------+-----+-----------+-------+
|               City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|
|       BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+-------------------+------------+-----+-----------+-------+



#### All JSONs in a directory with *.json

In [24]:
multiple_json_dir_df = session.read.json('../spark_files/*.json')

In [26]:
multiple_json_dir_df.show(5)

+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+------+-----+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+---------------+
|               City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|  Long|Notes|RecordNumber|State|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|_corrupt_record|
+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+------+-----+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+---------------+
|        PARC PARQUE|     US|        false|               null|17.96|NA-US-PR-PARC PARQUE|     Parc Parque, PR|NOT ACCEPTABLE|-66.22| null|           1|   PR|           null|      null|         NA| 0.38|-0.87|  0.3|   STANDARD|    704|         

### Defining a custom Schema

In [27]:
from pyspark.sql.types import *


In [28]:
zipcode_json_schema = StructType([
      StructField("RecordNumber",IntegerType(),True),  # name, type, nullable (T/F)
      StructField("Zipcode",IntegerType(),True),
      StructField("ZipCodeType",StringType(),True),
      StructField("City",StringType(),True),
      StructField("State",StringType(),True),
      StructField("LocationType",StringType(),True),
      StructField("Lat",DoubleType(),True),
      StructField("Long",DoubleType(),True),
      StructField("Xaxis",IntegerType(),True),
      StructField("Yaxis",DoubleType(),True),
      StructField("Zaxis",DoubleType(),True),
      StructField("WorldRegion",StringType(),True),
      StructField("Country",StringType(),True),
      StructField("LocationText",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("Decommisioned",BooleanType(),True),
      StructField("TaxReturnsFiled",StringType(),True),
      StructField("EstimatedPopulation",IntegerType(),True),
      StructField("TotalWages",IntegerType(),True),
      StructField("Notes",StringType(),True)
  ])


In [29]:
zipcode_json_df = session.read.schema(zipcode_json_schema).json('../spark_files/zipcode.json')

In [30]:
zipcode_json_df.printSchema()

root
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Xaxis: integer (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- TaxReturnsFiled: string (nullable = true)
 |-- EstimatedPopulation: integer (nullable = true)
 |-- TotalWages: integer (nullable = true)
 |-- Notes: string (nullable = true)



In [31]:
zipcode_json_df.show()

+------------+-------+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|RecordNumber|Zipcode|ZipCodeType|               City|State|  LocationType|  Lat|   Long|Xaxis|Yaxis|Zaxis|WorldRegion|Country|        LocationText|            Location|Decommisioned|TaxReturnsFiled|EstimatedPopulation|TotalWages|        Notes|
+------------+-------+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|           1|    704|   STANDARD|        PARC PARQUE|   PR|NOT ACCEPTABLE|17.96| -66.22| null|-0.87|  0.3|         NA|     US|     Parc Parque, PR|NA-US-PR-PARC PARQUE|        false|           null|               null|      null|         null|
|           2|    70

### Stopping a Spark Session with .stop()

In [32]:
session.stop()