# SparkSQL and Spark DataFrames

Need to import SQLContext module, which is needed to access SQL databases in Spark. Then we need to create a context instance.

In [3]:
from pyspark.sql import SQLContext

sqlsc = SQLContext(sc)

Next we will create a Spark DataFrame for a table from our PostgreSQL DB, the source of the DataFrame will be using a Java Database Connection.

In [5]:
df = sqlsc.read.format("jdbc")\
.option("url", "jdbc:postgresql://localhost/cloudera?user=cloudera")\
.option("dbtable", "gameclicks")\
.load()

#Checking if the schema was correctly loaded
df.printSchema()

root
 |-- timestamp: timestamp (nullable = false)
 |-- clickid: integer (nullable = false)
 |-- userid: integer (nullable = false)
 |-- usersessionid: integer (nullable = false)
 |-- ishit: integer (nullable = false)
 |-- teamid: integer (nullable = false)
 |-- teamlevel: integer (nullable = false)



In [6]:
#We can count the registers in the DataFrame by calling the count() method
df.count()

755806

In [8]:
#we can also filter one or more columns by calling the select() method
df.select("userid", "teamlevel", "teamid").show(5)

+------+---------+------+
|userid|teamlevel|teamid|
+------+---------+------+
|  1038|        1|    25|
|  1099|        1|    44|
|   899|        1|    71|
|  2197|        1|    99|
|  1362|        1|    13|
+------+---------+------+
only showing top 5 rows



#### Filter rows based on criteria

We can also filter for rows that match a specific criteria using **filter()** method. In the following example we would like to display the userid and the teamlevel for those players whose teamlevel is greater than one.

In [11]:
df.filter(df["teamlevel"]>1).select("userid","teamlevel").show(5)

#we can also use head(n) and tail(n) to retrieve registers

+------+---------+
|userid|teamlevel|
+------+---------+
|  1513|        2|
|   868|        2|
|  1453|        2|
|  1282|        2|
|  1473|        2|
+------+---------+
only showing top 5 rows



#### Group by a column and count

The **groupBy()** method groups the value of columns. In this example table there is one column which only has values 0 and 1, we can calculate how many time each occurs by grouping the values of this register and counting the result:

In [12]:
df.groupBy("ishit").count().show()

+-----+------+
|ishit| count|
+-----+------+
|    0|672423|
|    1| 83383|
+-----+------+



#### Aggregate operations

Aggregate operations can be performed on columns of DataFrames, but first we need to import the libraries for this aggregate operations. Once this is done, we will, for example, calculate the average and total values by calling **mean()** and **sum()** methods.

In [13]:
from pyspark.sql.functions import *

#Calculating the average of ishit
df.select(mean("ishit")).show()

+------------------+
|        avg(ishit)|
+------------------+
|0.1103232840173272|
+------------------+



In [15]:
#Calculating sum of ishit
df.select(sum("ishit")).show()

+----------+
|sum(ishit)|
+----------+
|     83383|
+----------+



#### Join two dataframes

We can merge or join two Dataframes on a single column. First, let's create a second Dataframe using another table with one of the columns of the first one.

In [16]:
df2 = sqlsc.read.format("jdbc")\
.option("url", "jdbc:postgresql://localhost/cloudera?user=cloudera")\
.option("dbtable", "adclicks")\
.load()

df2.printSchema()

root
 |-- timestamp: timestamp (nullable = false)
 |-- txid: integer (nullable = false)
 |-- usersessionid: integer (nullable = false)
 |-- teamid: integer (nullable = false)
 |-- userid: integer (nullable = false)
 |-- adid: integer (nullable = false)
 |-- adcategory: string (nullable = false)



Now we will join both Dataframes on the attribute userid by using **join()** method and saving the resulting Dataframe in a variable called merge_result.

In [17]:
merge_result = df.join(df2, 'userid')

#Let's check the schema, just point out merge_result is another dataframe
merge_result.printSchema()

root
 |-- userid: integer (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- clickid: integer (nullable = false)
 |-- usersessionid: integer (nullable = false)
 |-- ishit: integer (nullable = false)
 |-- teamid: integer (nullable = false)
 |-- teamlevel: integer (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- txid: integer (nullable = false)
 |-- usersessionid: integer (nullable = false)
 |-- teamid: integer (nullable = false)
 |-- adid: integer (nullable = false)
 |-- adcategory: string (nullable = false)



As we can see, the resulting Dataframe has all the columns of both tables, gameclicks and adclicks. Finally we will take a look to the contents of the merge:

In [19]:
merge_result.show(5)

+------+--------------------+-------+-------------+-----+------+---------+--------------------+-----+-------------+------+----+----------+
|userid|           timestamp|clickid|usersessionid|ishit|teamid|teamlevel|           timestamp| txid|usersessionid|teamid|adid|adcategory|
+------+--------------------+-------+-------------+-----+------+---------+--------------------+-----+-------------+------+----+----------+
|   231|2016-06-08 00:45:...| 376796|        23626|    0|   142|        4|2016-06-08 01:40:...|23669|        23626|   142|  27|     games|
|   231|2016-06-08 00:45:...| 376796|        23626|    0|   142|        4|2016-06-08 09:24:...|24122|        23626|   142|   4|     games|
|   231|2016-06-08 00:45:...| 376796|        23626|    0|   142|        4|2016-06-08 17:21:...|24659|        23626|   142|  22| computers|
|   231|2016-06-08 00:45:...| 376796|        23626|    0|   142|        4|2016-06-08 23:34:...|25076|        23626|   142|  21|    movies|
|   231|2016-06-08 00:45:..