http://spark.apache.org/docs/latest/

## Generic Load/Save Functions

In [2]:
import findspark

In [3]:
findspark.init('/home/daniel/spark-2.4.7-bin-hadoop2.7')

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession \
        .builder \
        .appName('Python Spark Basics') \
        .config('spark.some.config.option','some-value') \
        .getOrCreate()

In [6]:
# load parquet file
df = spark.read.load('/home/daniel/spark-2.4.7-bin-hadoop2.7/examples/src/main/resources/users.parquet')

In [10]:
# Save new parquet file (select: name, color) as namesAndFavColors to file path...
df.select('name','favorite_color').write.save('/home/daniel/spark-2.4.7-bin-hadoop2.7/examples/src/main/resources/namesAndFavColors.parquet')

In [11]:
# Run SQL on files directly
df2 = spark.sql("SELECT * FROM parquet.`/home/daniel/spark-2.4.7-bin-hadoop2.7/examples/src/main/resources/users.parquet`")

In [12]:
df2.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



## Generic File Source Options

#### ignore Corrupt files

In [13]:
#enable ignore corrupt files
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")

DataFrame[key: string, value: string]

In [15]:
# dir1/file3.json is corrupt from parquet's view
test_corrupt_df = spark.read.parquet("/home/daniel/spark-2.4.7-bin-hadoop2.7/examples/src/main/resources/namesAndFavColors.parquet")

## Parquet Files

#### parquet is a columnar format that is supported by many other data processing systems. When reading, all columns are automatically converted to be nullable for compatibility reasons

In [17]:
peopleDF = spark.read.json("/home/daniel/spark-2.4.7-bin-hadoop2.7/examples/src/main/resources/people.json")

In [18]:
# DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

In [20]:
# Parquet files  are  self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame
parquetFile = spark.read.parquet("people.parquet")

In [21]:
# Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")

In [22]:
teenagers.show()

+------+
|  name|
+------+
|Justin|
+------+



## JSON Files

#### Spark SQL can  automatically infer the schema of a JSON dataset and load it as a Dataset[Row]. 
#### This conversion can be done using SparkSession.read.json()

In [24]:
# using spark from previous example
sc = spark.sparkContext

In [25]:
people_path = "/home/daniel/spark-2.4.7-bin-hadoop2.7/examples/src/main/resources/people.json"
peopleDF  = spark.read.json(people_path)

In [26]:
peopleDF.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [27]:
# create temp view using dataframe
peopleDF.createOrReplaceTempView("people")

In [28]:
teenagerNameDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")

In [29]:
teenagerNameDF.show()

+------+
|  name|
+------+
|Justin|
+------+



#### Aternatively, DF can be created for a JSON dataset 
#### represented by an RDD[String] storing one JSON object per string

In [30]:
jsonStrings = ['{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}}']

In [31]:
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)

In [32]:
otherPeople.show()

+----------------+----+
|         address|name|
+----------------+----+
|[Columbus, Ohio]| Yin|
+----------------+----+



## Hive Tables

In [33]:
from os.path import join, abspath

In [34]:
from pyspark.sql import Row

In [35]:
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

In [36]:
spark = SparkSession \
        .builder \
        .appName("Python Spark SQL Hive integration example") \
        .config("spark.sql.warehouse.dir", warehouse_location) \
        .enableHiveSupport() \
        .getOrCreate()

In [38]:
# spark is an existing SparkSession 
# NOTE: WILL REQUIRE HIVE - Py4JJaveERROR
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH '/home/daniel/spark-2.4.7-bin-hadoop2.7/examples/src/main/resources/kv1.txt' INTO TABLE src")

AnalysisException: "Hive support is required to CREATE Hive TABLE (AS SELECT);;\n'CreateTable `src`, Ignore\n"