In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Analyzing air line data").getOrCreate()

24/04/09 20:16:17 WARN Utils: Your hostname, Abhijits-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.105 instead (on interface en0)
24/04/09 20:16:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/09 20:16:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/09 20:16:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/04/09 20:16:18 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


24/04/09 20:16:32 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [4]:
sc = spark.sparkContext

## Using SQL

In [2]:
from pyspark.sql.types import Row
from datetime import datetime

In [8]:
record = sc.parallelize([Row(id=1,
                        name ="Jill",
                        active = True,
                        clubs = ["chess", "hockey"],
                        subjects ={"math" : 80, "english": 56},
                        enrolled = datetime(2014,8,1,14,1,5)),
                        Row(id=2,
                        name ="George",
                        active = True,
                        clubs = ["chess", "soccer"],
                        subjects ={"math" : 60, "english": 96},
                        enrolled = datetime(2015,3,21,8,2,5))])

In [10]:
record_df = record.toDF()

                                                                                

In [11]:
### To run sql queries on dataframe we need to register this dataframe as a table

record_df.createOrReplaceTempView("records")  ### Create temp table per-session -> once session ends this table goes away. also not sharable across sessions

#### In order to acheive the sharing use createOrReplaceGlobalView

In [14]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)



In [16]:
### Running queries with SQL context

all_records_df = sqlContext.sql("SELECT * from Records")

In [17]:
all_records_df.show()

+---+------+------+---------------+--------------------+-------------------+
| id|  name|active|          clubs|            subjects|           enrolled|
+---+------+------+---------------+--------------------+-------------------+
|  1|  Jill|  true|[chess, hockey]|{english -> 56, m...|2014-08-01 14:01:05|
|  2|George|  true|[chess, soccer]|{english -> 96, m...|2015-03-21 08:02:05|
+---+------+------+---------------+--------------------+-------------------+



In [19]:
### Complex queries
sqlContext.sql("SELECT id, clubs[1], subjects['english'] from Records").show()  ## Fetches according to requirement


+---+--------+-----------------+
| id|clubs[1]|subjects[english]|
+---+--------+-----------------+
|  1|  hockey|               56|
|  2|  soccer|               96|
+---+--------+-----------------+



In [20]:
### We can use queries as: 
sqlContext.sql("SELECT * from Records where subjects['english']>90").show()


+---+------+------+---------------+--------------------+-------------------+
| id|  name|active|          clubs|            subjects|           enrolled|
+---+------+------+---------------+--------------------+-------------------+
|  2|George|  true|[chess, soccer]|{english -> 96, m...|2015-03-21 08:02:05|
+---+------+------+---------------+--------------------+-------------------+



In [None]:
### In order to access global tables we need to add a "global_temp" prefix to the table name

## Schema in spark SQL

In [21]:
all_records_df.schema

### Even if we do not define a schema externally Spark SQL infers a schema from a dataframe -> inferred schema

StructType([StructField('id', LongType(), True), StructField('name', StringType(), True), StructField('active', BooleanType(), True), StructField('clubs', ArrayType(StringType(), True), True), StructField('subjects', MapType(StringType(), LongType(), True), True), StructField('enrolled', TimestampType(), True)])

In [23]:
## We can also explicitly define schema from outside

lines = sc.textFile("/Volumes/T7/GettingStartedSpark2/students.txt")
parts = lines.map(lambda line: line.split(','))

parts.collect()

[['Emily', '44', '55', '78'],
 ['Andy', '47', '34', '89'],
 ['Rick', '55', '78', '55'],
 ['Aaron', '66', '34', '98']]

In [24]:
schemaString = "name math english science"

from pyspark.sql.types import StructType, StructField, StringType, LongType

fields = [StructField("name", StringType() , True),
StructField("math", LongType() , True),
StructField("english", LongType() , True),
StructField("science", LongType() , True)]


In [26]:
schema = StructType(fields)

In [27]:
schemaStudents = spark.createDataFrame(parts, schema)

In [28]:
schemaStudents.schema   ## Thus we can define schema

StructType([StructField('name', StringType(), True), StructField('math', LongType(), True), StructField('english', LongType(), True), StructField('science', LongType(), True)])

## Window Functions

In [None]:
### Window functions
#### rowsBetween(-1, 1) -> for row x it considers value from the row before x to the row after x

### rangeBetween(0, 50) -> this ranges from the current row to the 50th row from current row

### rangeBetween(0, sys.maxsize) -> goes from that row to the last row of the table.

In [30]:
products = spark.read.format("csv").option("header", "true").load("/Volumes/T7/GettingStartedSpark2/products.csv")

In [31]:
products.show()

+----------+--------+-----+
|   product|category|price|
+----------+--------+-----+
|Samsung TX|  Tablet|  999|
|Samsung JX|  Mobile|  799|
|Redmi Note|  Mobile|  399|
|        Mi|  Mobile|  299|
|      iPad|  Tablet|  789|
|    iPhone|  Mobile|  999|
|  Micromax|  Mobile|  249|
|    Lenovo|  Tablet|  499|
|   OnePlus|  Mobile|  356|
|        Xu|  Tablet|  267|
+----------+--------+-----+



In [32]:
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

In [34]:
windowSpec1 = Window.partitionBy(products["category"]).orderBy(products["price"].desc())

## We partition our product based on category -> i.e, mobile  or tablet and then within each partition we order by price in descending order

In [39]:
price_rank = func.rank().over(windowSpec1)

### Func ranks each element in the window by prices

In [41]:
#### Using rowsBetween

windowSpec2 = Window.partitionBy(products["category"]).orderBy(products["price"].desc()).rowsBetween(-1, 1)  