In [6]:
#from pyspark import SparkConf, SparkContext

from spark 2.0 sparkSession is being used instead of sparkContext
earlier sparkContext came with RDD api support and for sql we need sqlcontext and for streaming we need streaming context. but with SparkSession, it can be treated as single point of entry for all the spark features

SparkSession also had support for datasets and rows API

STEPS:

* READ AS AN RDD using spark context or read as DF directly, if u can perfectly split
* DF or DS (dataframe or dataset need schema so converting RDD to DF or DS needs schema)
* CONVERT INTO A DF
* create a temp view
* run sql syntax like queries or DF API


In [1]:
from pyspark.sql import SparkSession, Row

In [2]:
import collections

In [3]:
spark = SparkSession.builder.appName("Friends").getOrCreate()

In [4]:
# reads as DF directly
data = spark.read.text("./datasets/fakefriends.csv")


AnalysisException: 'org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.thrift.transport.TTransportException;'

In [None]:
type(data)


 #### show will show top 20 records and since schema is not mentioned and read as text, it will be considered as values

In [None]:
data.show()

pipelinedRDD is a subset of RDD and is returned when a map or any transformation is performed

In [12]:
# when to use cache, 
#if an action is called on RDD repetatively or couple of action, its better to cache

In [13]:
# create an RDD and then convert into dF later
data = spark.sparkContext.textFile("./datasets/fakefriends.csv")

In [14]:
type(data)

pyspark.rdd.RDD

Lets us go ahead and create a RDD with rows and add some structure

In [15]:
def rowMapper(line):
    fields = line.split(",")
    return Row(ID=int(fields[0]), 
               name = fields[1],
               age = int(fields[2]),
               numOfFriends = int(fields[3])
              )
    

In [16]:
friends_data = data.map(rowMapper) # [Row(ID=0, age=33, name=b'Will', numOfFriends=385)]

* conversion of RDD to df needs to be cached as this is an expensive operation
* when cache is used it doesnt mean the data is read from file
* it will cache when an action is called and file is read

In [17]:
# INFER SCHEMA
schemaFriends = spark.createDataFrame(friends_data).cache()

In [18]:
# register df as tables
schemaFriends.createOrReplaceTempView("friends")

In [19]:
teenagers = spark.sql("select * from friends where age >= 13 and age <=19")

In [20]:
type(teenagers)

pyspark.sql.dataframe.DataFrame

In [21]:
teenagers.show()

+---+---+-------+------------+
| ID|age|   name|numOfFriends|
+---+---+-------+------------+
| 21| 19|  Miles|         268|
| 52| 19|Beverly|         269|
| 54| 19|  Brunt|           5|
|106| 18|Beverly|         499|
|115| 18|  Dukat|         397|
|133| 19|  Quark|         265|
|136| 19|   Will|         335|
|225| 19|   Elim|         106|
|304| 19|   Will|         404|
|341| 18|   Data|         326|
|366| 19|  Keiko|         119|
|373| 19|  Quark|         272|
|377| 18|Beverly|         418|
|404| 18| Kasidy|          24|
|409| 19|    Nog|         267|
|439| 18|   Data|         417|
|444| 18|  Keiko|         472|
|492| 19|  Dukat|          36|
|494| 18| Kasidy|         194|
+---+---+-------+------------+



In [22]:
teenagers.collect()

[Row(ID=21, age=19, name='Miles', numOfFriends=268),
 Row(ID=52, age=19, name='Beverly', numOfFriends=269),
 Row(ID=54, age=19, name='Brunt', numOfFriends=5),
 Row(ID=106, age=18, name='Beverly', numOfFriends=499),
 Row(ID=115, age=18, name='Dukat', numOfFriends=397),
 Row(ID=133, age=19, name='Quark', numOfFriends=265),
 Row(ID=136, age=19, name='Will', numOfFriends=335),
 Row(ID=225, age=19, name='Elim', numOfFriends=106),
 Row(ID=304, age=19, name='Will', numOfFriends=404),
 Row(ID=341, age=18, name='Data', numOfFriends=326),
 Row(ID=366, age=19, name='Keiko', numOfFriends=119),
 Row(ID=373, age=19, name='Quark', numOfFriends=272),
 Row(ID=377, age=18, name='Beverly', numOfFriends=418),
 Row(ID=404, age=18, name='Kasidy', numOfFriends=24),
 Row(ID=409, age=19, name='Nog', numOfFriends=267),
 Row(ID=439, age=18, name='Data', numOfFriends=417),
 Row(ID=444, age=18, name='Keiko', numOfFriends=472),
 Row(ID=492, age=19, name='Dukat', numOfFriends=36),
 Row(ID=494, age=18, name='Kasidy',

Try the same using spark methods

![alt sql methods](./images/spark-sql-methods.png)

In [23]:
schemaFriends.select("name","age").filter((schemaFriends['age'] >=13) &
                                (schemaFriends['age'] <=20)).orderBy('age').show()

+-------+---+
|   name|age|
+-------+---+
| Kasidy| 18|
|   Data| 18|
| Kasidy| 18|
|   Data| 18|
|  Dukat| 18|
|Beverly| 18|
|Beverly| 18|
|  Keiko| 18|
|  Keiko| 19|
|  Quark| 19|
|    Nog| 19|
|   Will| 19|
|  Brunt| 19|
|Beverly| 19|
|  Quark| 19|
|   Will| 19|
|  Dukat| 19|
|   Elim| 19|
|  Miles| 19|
|    Nog| 20|
+-------+---+
only showing top 20 rows



-------------

### with hive context

In [24]:
from pyspark.sql import SQLContext, Row 
from pyspark.sql.types import StructField, StructType, IntegerType, StringType
from pyspark import HiveContext

In [43]:
ss = SparkSession.builder.enableHiveSupport().getOrCreate()
hivectx = HiveContext(ss.sparkContext)
#hivectx.setConf("hive.metastore.uris","thrift://localhost:10000")

In [44]:
# way to pass schema
input_data = spark.read.csv('./datasets/fakefriends.csv', 
                            schema = 'ID INT, name String, age INT, numOfFriends INT')
# -- or --

schema_friends = StructType([
    StructField(name='ID',dataType=IntegerType(),nullable=False,metadata={"info":"unique ID"}),
    StructField(name='name',dataType=StringType(),nullable=False,metadata={"info":"name"}),
    StructField(name='age',dataType=IntegerType(),nullable=True,metadata={"info":"age"}),
    StructField(name='numOfFriends',dataType=IntegerType(),nullable=True,metadata={"info":"count"})                           
])
input_data = spark.read.csv('./datasets/fakefriends.csv', 
                            schema = schema_friends
                            )
input_data.schema

StructType(List(StructField(ID,IntegerType,true),StructField(name,StringType,true),StructField(age,IntegerType,true),StructField(numOfFriends,IntegerType,true)))

* can infer schema if has column names
* to infer schema, spark has to read all values which is data intensive, so dont infer and provide schema explicitly

`input_data = spark.read.csv('./datasets/fakefriends.csv', inferSchema=True )`

In [45]:
input_data.createOrReplaceTempView('friends_fake')
input_data.schema
input_data.count()
#hivectx.cacheTable('friends_fake')

In [46]:
teenagers = hivectx.sql("select * from friends_fake where age >=13 and age <=19")
teenagers.show()

+---+-------+---+------------+
| ID|   name|age|numOfFriends|
+---+-------+---+------------+
| 21|  Miles| 19|         268|
| 52|Beverly| 19|         269|
| 54|  Brunt| 19|           5|
|106|Beverly| 18|         499|
|115|  Dukat| 18|         397|
|133|  Quark| 19|         265|
|136|   Will| 19|         335|
|225|   Elim| 19|         106|
|304|   Will| 19|         404|
|341|   Data| 18|         326|
|366|  Keiko| 19|         119|
|373|  Quark| 19|         272|
|377|Beverly| 18|         418|
|404| Kasidy| 18|          24|
|409|    Nog| 19|         267|
|439|   Data| 18|         417|
|444|  Keiko| 18|         472|
|492|  Dukat| 19|          36|
|494| Kasidy| 18|         194|
+---+-------+---+------------+



# close session


In [48]:
spark.stop()