#### Starting Point: SparkSession

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

In [7]:
spark = SparkSession.builder.appName("Spark SQL").getOrCreate()

In [11]:
sqlContext= SQLContext(spark)

#### Creating DataFrames

In [12]:
df = spark.read.json("/Github_repo/Data-Engineering/Data-Streaming-101/Spark_projects/people.json")

In [13]:
# Displays the content of the DataFrame to stdout
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Untyped Dataset Operations (aka DataFrame Operations)

In [14]:
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [15]:
# Select only the "name" column
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [16]:
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [17]:
# Select people older than 21
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [18]:
# Count people by age
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



#### Running SQL Queries Programmatically

In [19]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

In [20]:
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Global Temporary View

In [21]:
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

In [22]:
# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [23]:
# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Inferring the Schema Using Reflection

In [24]:
from pyspark.sql import Row

In [25]:
sc = spark.sparkContext

In [26]:
# Load a text file and convert each line to a Row.
lines = sc.textFile("/Github_repo/Data-Engineering/Data-Streaming-101/Spark_projects/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

In [27]:
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

In [28]:
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

In [29]:
# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)

Name: Justin


***