In [24]:
# Create a basic SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Python Spark SQL basic example")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()

In [4]:
#Creating DataFrames
#spark is an existing SparkSession
df = spark.read.json("people 2.json")
#Displays the content of the DataFrame to stdout
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
#Untyped Dataset Operations (aka DataFrame Operations)


**In Python it's possible to access a DataFrame's columns either by attribute (df.age) or by indexing(df['age']). While the former is convenient for interactive data exploration, it is highly recommended the letter form to be used**

In [6]:
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [7]:
# Select only the "name" column
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [8]:
#Select everybody, but increment the age by 1
df.select(df['name'], df['age'] +1).show()


+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [9]:
# Select people older than 21
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [10]:
#Count people by age
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



In [11]:
#Running SQL Queries Programmatically


**The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.**

In [12]:
#Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

In [13]:
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [14]:
#Global Temporary View

**Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database "global_temp" and we must use the qualified name to refer it, e.g. SELeCT * FROM "global_temp.view1"**

In [15]:
#Register the DataFrame as global temporary view
df.createGlobalTempView("people")

In [17]:
#Global temporary view is tied to a system preserved database 'global_temp'
spark.sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [18]:
#Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [19]:
# Creating Datasets
#WARNING ITS SCALA AND JAVA ONLY

*Datasets are similar to RDDs, however, instead of using Java serialization or Kryo
they use a specialized Encoder to serialize the objects for processing or
transmitting over the network. While both encoders and standard serialization 
are responsible for turning an object into bytes, encoders are code generated dynamically and
use a format that allows Spark to perform many operations like filtering
sorting and hashing without deserializing the bytes back into an object.*

In [None]:
#case class Person(name: String, age: Long)

#Encoders are created for case classes
#val caseClassDS = Seq(Person("Andy", 32)).toDS()
#caseClassDS.show()

In [1]:
#Encoders for most common types are automatically provided by importing spark.implicits._
#val primitiveDS = Seq(1, 2, 3).toDS()
#primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

#DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
#val path = "examples/src/main/resources/people.json"
#val peopleDS = spark.read.json(path).as[Person]
#peopleDS.show()

In [2]:
#Iteroperating with RDDs

*
-Spark SQL supports two different methods for converting existing RDDs int
Datasets. The first method uses reflection to infer the schema of an RDD that
contains specific types of objects. This reflection-based approach leads
to more concise code and works well when you already know the schema while
writing your Spark application

-The second method for creating Datasets is through a programatic interface that
allows you to construct a schema and then apply it to an existing RDD. While this
method is more verbose, it allows you to construct Datasets when the columns
and their types not known until runtime.

*

In [3]:
#Inferring the Schema Using Reflection

*Spark SQL can convert an RDD of ROw objects to a DataFrame, 
inferring the datatypes. Rows are constructed by passing a list of 
key/value pairs as kwargs to the Row class. The keys of this list
define the column names of the table, and the types are inferred by 
sampling the whole dataset, similar inference that is performed on JSON
files.*

In [8]:
from pyspark.sql import Row

sc = spark.sparkContext

#Load a text file and convert each line to a Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <=19")

#The results of SQL queries are DataFrame objects.
# rdd returns the content as an : class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)
    

Name: Justin


In [9]:
# Programmatically Specifying the Schema

When a dictionary of kwargs cannot be defined ahead of time (for example,
the structure of records is encoded in a string, or a text dataset will be
parsed and fields will be projected differently for different users),
a DataFrame can be created programmatically with three steps.
1. Create a RDD of tuples or lists from the original RDD;
2. Create the schema represented by a StructType matching the structure
of tuples or lists in the RDD created in the step 1.
3. Apply the schema to the RDD via createDataFrame method provided by 
SparkSession

In [11]:
# Import data types
from pyspark.sql.types import *

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



**Aggregations**

The built-in DataFrames functions provide common aggregations such as count(),
countDistinct(),avg(), max(), min(), etc. While those functions are designed for 
DataFrames, Spark SQL also has type-safe versions for some of them in Scala and
Java to work with strongly typed Datasets. Moreover, users are limited to the predefined
aggregate functions and can create their own.

***Generic Load/Save Functions***

In [12]:
df = spark.read.load("users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

In the simplest form, the default data source (parquet unless otherwise configured by
spark.sql.sources.default) will be used for all operations.

In [13]:
#Manually Specifying Operations

You can also manually specify the data source that will be used 
along with any extra options that you would like to pass to data source.
Data sources are specified by their fully qualified name
(i.e. org.apache.sql.parquet), but for built-in sources you can
also use their short names(json, parquet, jdbc, orc, libsvm, csv, text).
DataFrames loaded from any data source type can be converted into 
other types using this syntax.

In [14]:
df = spark.read.load("people 2.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

In [15]:
#To load a CSV file you can use

In [16]:
df = spark.read.load("people.csv", format="csv", sep=":", inferSchema="true", header="true")

You can control bloom filters and dictionary encodings for ORC data sources. 
The following ORC example will create bloom on favorite_color and
use dictionary encoding for name and favourite_color. For Parquet,
there exists parquet.enable.dictionary, too.
To find more detailed information about the extra ORC/Parquet, visit
the official Apache ORC/Parquet websites.

In [18]:
df = spark.read.orc("users.orc")
(df.write.format("orc")
    .option("orc.bloom.filter.columns", "favorite_color")
    .option("orc.dictionary.key.threshold", "1.0")
    .save("users_with_options.orc"))

In [19]:
#Run SQL on files directory

Instead of using read API to load a file into DataFrame and query it,
you can query that file directly with SQL

In [20]:
df = spark.sql("SELECT * FROM parquet.`users.parquet`")

In [21]:
#Bucketing, Sorting and Partitioning

For file-based data source, it is also possible to bucket
and sort or partition the output. Bucketing and sorting are applicable
only to persistent tables:

In [28]:
#df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

In [29]:
df = spark.read.parquet("users.parquet")
(df
    .write
    .partitionBy("favorite_color")
    .bucketBy(42, "name")
    .saveAsTable("people_partitioned_bucketed"))

In [30]:
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

In [31]:
#Parquet Files

Parquet is a columnar format that is supported by many other data processing systems.
Spark SQL provides support for both reading and writing Parquet files
that automatically preserves thaschema of the original data. When 
writing Parquet files, all columns are automatically converted to be 
nullable for compatibility reasons.

In [33]:
#Loading Data Programmatically

In [34]:
peopleDF = spark.read.json("people 2.json")

In [35]:
# DataFrames can be saved as Parquet files, maintaining the schema
# information.

In [37]:
peopleDF.write.parquet("people.parquet")

In [38]:


# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")

In [39]:

#Parquet files can also be used to create a temporary view and than
# used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()

+------+
|  name|
+------+
|Justin|
+------+



In [40]:
#Schema Marging