In [None]:
#!pip install pyspark

import sys
!{sys.executable} -m pip install pyspark

import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [1]:
# Initializing the spark session
spark

## Exploring Spark

To start our exploration of PySpark we are going to look at some of the basic data structures and how we can use their inbuilt functions for manipulating the data they contain. We will start off by reviewing some of the commonalities and differences between `Pandas` and `PySpark` dataframes since many `Python` developers are familiar with the `Pandas` library.

## What is a dataframe?


DataFrame is a 2-dimensional labeled data structure with columns of potentially different types. You can think of it like a spreadsheet or SQL table, or a dict of Series objects.

[Source](https://pandas.pydata.org/pandas-docs/stable/dsintro.html)

In [2]:
pd_answers_df = pd.read_csv('./data/stacksample/Answers.csv', encoding='latin1')
pd_questions_df = pd.read_csv('./data/stacksample/Questions.csv', encoding='latin1')
pd_tags_df = pd.read_csv('./data/stacksample/Tags.csv', encoding='latin1')

In [3]:
pd_answers_df.head()

Unnamed: 0,Id,OwnerUserId,CreationDate,ParentId,Score,Body
0,92,61.0,2008-08-01T14:45:37Z,90,13,"<p><a href=""http://svnbook.red-bean.com/"">Vers..."
1,124,26.0,2008-08-01T16:09:47Z,80,12,<p>I wound up using this. It is a kind of a ha...
2,199,50.0,2008-08-01T19:36:46Z,180,1,<p>I've read somewhere the human eye can't dis...
3,269,91.0,2008-08-01T23:49:57Z,260,4,"<p>Yes, I thought about that, but I soon figur..."
4,307,49.0,2008-08-02T01:49:46Z,260,28,"<p><a href=""http://www.codeproject.com/Article..."


In [4]:
pd_questions_df.head()

Unnamed: 0,Id,OwnerUserId,CreationDate,ClosedDate,Score,Title,Body
0,80,26.0,2008-08-01T13:57:07Z,,26,SQLStatement.execute() - multiple queries in o...,<p>I've written a database generation script i...
1,90,58.0,2008-08-01T14:41:24Z,2012-12-26T03:45:49Z,144,Good branching and merging tutorials for Torto...,<p>Are there any really good tutorials explain...
2,120,83.0,2008-08-01T15:50:08Z,,21,ASP.NET Site Maps,<p>Has anyone got experience creating <strong>...
3,180,2089740.0,2008-08-01T18:42:19Z,,53,Function for creating color wheels,<p>This is something I've pseudo-solved many t...
4,260,91.0,2008-08-01T23:22:08Z,,49,Adding scripting functionality to .NET applica...,<p>I have a little game written in C#. It uses...


In [5]:
pd_tags_df.head()

Unnamed: 0,Id,Tag
0,80,flex
1,80,actionscript-3
2,80,air
3,90,svn
4,90,tortoisesvn


In [6]:
pd_tags_df.dtypes

Id      int64
Tag    object
dtype: object

In [7]:
sp_answers_df = spark.read.csv('./data/stacksample/Answers.csv', 
                               sep=',', 
                               escape='"', 
                               header=True, 
                               inferSchema=True, 
                               multiLine=True)
sp_questions_df = spark.read.csv('./data/stacksample/Questions.csv', 
                               sep=',', 
                               escape='"', 
                               header=True, 
                               inferSchema=True, 
                               multiLine=True)
sp_tags_df = spark.read.csv('./data/stacksample/Tags.csv', 
                               sep=',', 
                               escape='"', 
                               header=True, 
                               inferSchema=True, 
                               multiLine=True)

In [8]:
sp_answers_df.show(5)

+---+-----------+-------------------+--------+-----+--------------------+
| Id|OwnerUserId|       CreationDate|ParentId|Score|                Body|
+---+-----------+-------------------+--------+-----+--------------------+
| 92|         61|2008-08-01 10:45:37|      90|   13|<p><a href="http:...|
|124|         26|2008-08-01 12:09:47|      80|   12|<p>I wound up usi...|
|199|         50|2008-08-01 15:36:46|     180|    1|<p>I've read some...|
|269|         91|2008-08-01 19:49:57|     260|    4|<p>Yes, I thought...|
|307|         49|2008-08-01 21:49:46|     260|   28|<p><a href="http:...|
+---+-----------+-------------------+--------+-----+--------------------+
only showing top 5 rows



In [9]:
sp_questions_df.show(3)

+---+-----------+-------------------+--------------------+-----+--------------------+--------------------+
| Id|OwnerUserId|       CreationDate|          ClosedDate|Score|               Title|                Body|
+---+-----------+-------------------+--------------------+-----+--------------------+--------------------+
| 80|         26|2008-08-01 09:57:07|                  NA|   26|SQLStatement.exec...|<p>I've written a...|
| 90|         58|2008-08-01 10:41:24|2012-12-26T03:45:49Z|  144|Good branching an...|<p>Are there any ...|
|120|         83|2008-08-01 11:50:08|                  NA|   21|   ASP.NET Site Maps|<p>Has anyone got...|
+---+-----------+-------------------+--------------------+-----+--------------------+--------------------+
only showing top 3 rows



In [10]:
sp_tags_df.show(5)

+---+--------------+
| Id|           Tag|
+---+--------------+
| 80|          flex|
| 80|actionscript-3|
| 80|           air|
| 90|           svn|
| 90|   tortoisesvn|
+---+--------------+
only showing top 5 rows



In [11]:
sp_tags_df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Tag: string (nullable = true)



## Applying a function to a dataframe column

Pandas [`.apply`](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.apply.html)

Spark [`udf`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.udf) and [`withColumn`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.udf)

In [12]:
pd_questions_df['match'] = pd_questions_df['Body'].apply(lambda x: 'python' in x)

In [13]:
pd_questions_df.head()

Unnamed: 0,Id,OwnerUserId,CreationDate,ClosedDate,Score,Title,Body,match
0,80,26.0,2008-08-01T13:57:07Z,,26,SQLStatement.execute() - multiple queries in o...,<p>I've written a database generation script i...,False
1,90,58.0,2008-08-01T14:41:24Z,2012-12-26T03:45:49Z,144,Good branching and merging tutorials for Torto...,<p>Are there any really good tutorials explain...,False
2,120,83.0,2008-08-01T15:50:08Z,,21,ASP.NET Site Maps,<p>Has anyone got experience creating <strong>...,False
3,180,2089740.0,2008-08-01T18:42:19Z,,53,Function for creating color wheels,<p>This is something I've pseudo-solved many t...,False
4,260,91.0,2008-08-01T23:22:08Z,,49,Adding scripting functionality to .NET applica...,<p>I have a little game written in C#. It uses...,False


In [14]:
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
 
udf_find_python_token = udf(lambda x: 'python' in x, BooleanType())
sp_questions_df = sp_questions_df.withColumn("match", udf_find_python_token("body"))

In [15]:
sp_questions_df.show(3)

+---+-----------+-------------------+--------------------+-----+--------------------+--------------------+-----+
| Id|OwnerUserId|       CreationDate|          ClosedDate|Score|               Title|                Body|match|
+---+-----------+-------------------+--------------------+-----+--------------------+--------------------+-----+
| 80|         26|2008-08-01 09:57:07|                  NA|   26|SQLStatement.exec...|<p>I've written a...|false|
| 90|         58|2008-08-01 10:41:24|2012-12-26T03:45:49Z|  144|Good branching an...|<p>Are there any ...|false|
|120|         83|2008-08-01 11:50:08|                  NA|   21|   ASP.NET Site Maps|<p>Has anyone got...|false|
+---+-----------+-------------------+--------------------+-----+--------------------+--------------------+-----+
only showing top 3 rows



## Joining Data

Pandas [`.merge`, `.concat`, `.join`](https://pandas.pydata.org/pandas-docs/stable/merging.html)

Spark [`.join`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join#pyspark.sql.DataFrame.join)

Spark [`.intersect`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join#pyspark.sql.DataFrame.intersect)

Spark [`.crossJoin`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join#pyspark.sql.DataFrame.crossJoin)



In [16]:
pd.merge(pd_questions_df, pd_tags_df, on='Id').head()

Unnamed: 0,Id,OwnerUserId,CreationDate,ClosedDate,Score,Title,Body,match,Tag
0,80,26.0,2008-08-01T13:57:07Z,,26,SQLStatement.execute() - multiple queries in o...,<p>I've written a database generation script i...,False,flex
1,80,26.0,2008-08-01T13:57:07Z,,26,SQLStatement.execute() - multiple queries in o...,<p>I've written a database generation script i...,False,actionscript-3
2,80,26.0,2008-08-01T13:57:07Z,,26,SQLStatement.execute() - multiple queries in o...,<p>I've written a database generation script i...,False,air
3,90,58.0,2008-08-01T14:41:24Z,2012-12-26T03:45:49Z,144,Good branching and merging tutorials for Torto...,<p>Are there any really good tutorials explain...,False,svn
4,90,58.0,2008-08-01T14:41:24Z,2012-12-26T03:45:49Z,144,Good branching and merging tutorials for Torto...,<p>Are there any really good tutorials explain...,False,tortoisesvn


In [17]:
sp_questions_df.join(sp_tags_df, ['id']).show(3)

+----+-----------+-------------------+----------+-----+--------------------+--------------------+-----+----+
|  Id|OwnerUserId|       CreationDate|ClosedDate|Score|               Title|                Body|match| Tag|
+----+-----------+-------------------+----------+-----+--------------------+--------------------+-----+----+
|7880|        432|2008-08-11 11:50:51|        NA|   15|How do you open a...|<p>I want to open...|false| c++|
|7880|        432|2008-08-11 11:50:51|        NA|   15|How do you open a...|<p>I want to open...|false|file|
|7880|        432|2008-08-11 11:50:51|        NA|   15|How do you open a...|<p>I want to open...|false|  io|
+----+-----------+-------------------+----------+-----+--------------------+--------------------+-----+----+
only showing top 3 rows



In [18]:
sp_questions_df.join(sp_tags_df, ['id']).join(sp_answers_df, ['id'], how='outer').head(3)

[Row(Id=7880, OwnerUserId='432', CreationDate=datetime.datetime(2008, 8, 11, 11, 50, 51), ClosedDate='NA', Score=15, Title='How do you open a file in C++?', Body='<p>I want to open a file for reading, the C++ way. I need to be able to do it for:</p>\n\n<ul>\n<li><p>text files, which would involve some sort of read line function.</p></li>\n<li><p>binary files, which would provide a way to read raw data into a <code>char*</code> buffer.</p></li>\n</ul>\n', match=False, Tag='c++', OwnerUserId=None, CreationDate=None, ParentId=None, Score=None, Body=None),
 Row(Id=7880, OwnerUserId='432', CreationDate=datetime.datetime(2008, 8, 11, 11, 50, 51), ClosedDate='NA', Score=15, Title='How do you open a file in C++?', Body='<p>I want to open a file for reading, the C++ way. I need to be able to do it for:</p>\n\n<ul>\n<li><p>text files, which would involve some sort of read line function.</p></li>\n<li><p>binary files, which would provide a way to read raw data into a <code>char*</code> buffer.</p

## Findings Nulls

Pandas [`isnull()`](https://pandas.pydata.org/pandas-docs/version/0.23.4/generated/pandas.isnull.html)

Spark [`isNull()`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.udf)

Spark [`NullType`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.types.NullType)

Spark [`NA Functions`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameNaFunctions)

In [20]:
pd.isnull(pd_questions_df['ClosedDate'])[0:10]

0     True
1    False
2     True
3     True
4     True
5     True
6    False
7     True
8     True
9     True
Name: ClosedDate, dtype: bool

In [24]:
sp_questions_df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: string (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- ClosedDate: string (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Body: string (nullable = true)
 |-- match: boolean (nullable = true)



In [25]:
sp_questions_df.withColumn('Closed', sp_questions_df.ClosedDate.isNull()).show(10)

+---+-----------+-------------------+--------------------+-----+--------------------+--------------------+-----+------+
| Id|OwnerUserId|       CreationDate|          ClosedDate|Score|               Title|                Body|match|Closed|
+---+-----------+-------------------+--------------------+-----+--------------------+--------------------+-----+------+
| 80|         26|2008-08-01 09:57:07|                  NA|   26|SQLStatement.exec...|<p>I've written a...|false| false|
| 90|         58|2008-08-01 10:41:24|2012-12-26T03:45:49Z|  144|Good branching an...|<p>Are there any ...|false| false|
|120|         83|2008-08-01 11:50:08|                  NA|   21|   ASP.NET Site Maps|<p>Has anyone got...|false| false|
|180|    2089740|2008-08-01 14:42:19|                  NA|   53|Function for crea...|<p>This is someth...|false| false|
|260|         91|2008-08-01 19:22:08|                  NA|   49|Adding scripting ...|<p>I have a littl...|false| false|
|330|         63|2008-08-01 22:51:36|   

## Groupby

Pandas [`.groupby()`](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.groupby.html#pandas.DataFrame.groupby)

Spark [`.groupBy()`](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy)

In [26]:
pd_tags_df.groupby('Tag').sum()[0:5]

Unnamed: 0_level_0,Id
Tag,Unnamed: 1_level_1
.a,253502670
.app,290775160
.aspxauth,93769460
.bash-profile,1199125500
.class-file,423826210


In [27]:
sp_tags_df.groupby('Tag').sum().show(5)

+--------------------+-----------+
|                 Tag|    sum(Id)|
+--------------------+-----------+
|knowledge-management|  147563640|
|              iframe|47843084300|
|        fuzzy-search| 1164098580|
|    fluent-interface|  633226040|
|           standards| 3199444790|
+--------------------+-----------+
only showing top 5 rows



# Spark [SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html#overview)

A Spark module for structured data processing. The interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.

## Creating tables

Spark [tables](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Catalog.createTable)

In [28]:
sp_tags_df.registerTempTable("tags")
sp_questions_df.registerTempTable("questions")

## Using SQL Statements

Spark [sql](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.sql)

In [29]:
sql_df = sqlContext.sql("SELECT CreationDate from questions")
sql_df.show(3)

+-------------------+
|       CreationDate|
+-------------------+
|2008-08-01 09:57:07|
|2008-08-01 10:41:24|
|2008-08-01 11:50:08|
+-------------------+
only showing top 3 rows



In [30]:
joined_df = sqlContext.sql("SELECT t.Tag, q.Title, q.Body FROM questions q INNER JOIN tags t on q.id = t.id")
joined_df.show(3)

+----+--------------------+--------------------+
| Tag|               Title|                Body|
+----+--------------------+--------------------+
| c++|How do you open a...|<p>I want to open...|
|file|How do you open a...|<p>I want to open...|
|  io|How do you open a...|<p>I want to open...|
+----+--------------------+--------------------+
only showing top 3 rows



In [31]:
so_survey_df = spark.read.csv('./data/so-developer-survey-2017/survey_results_public.csv', 
                               sep=',', 
                               escape='"', 
                               header=True, 
                               inferSchema=True, 
                               multiLine=True)

so_survey_df.printSchema()

root
 |-- Respondent: integer (nullable = true)
 |-- Professional: string (nullable = true)
 |-- ProgramHobby: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- University: string (nullable = true)
 |-- EmploymentStatus: string (nullable = true)
 |-- FormalEducation: string (nullable = true)
 |-- MajorUndergrad: string (nullable = true)
 |-- HomeRemote: string (nullable = true)
 |-- CompanySize: string (nullable = true)
 |-- CompanyType: string (nullable = true)
 |-- YearsProgram: string (nullable = true)
 |-- YearsCodedJob: string (nullable = true)
 |-- YearsCodedJobPast: string (nullable = true)
 |-- DeveloperType: string (nullable = true)
 |-- WebDeveloperType: string (nullable = true)
 |-- MobileDeveloperType: string (nullable = true)
 |-- NonDeveloperType: string (nullable = true)
 |-- CareerSatisfaction: string (nullable = true)
 |-- JobSatisfaction: string (nullable = true)
 |-- ExCoderReturn: string (nullable = true)
 |-- ExCoderNotForMe: string (nullable = t

In [32]:
so_survey_df.select('IDE').distinct().show(5)

+--------------------+
|                 IDE|
+--------------------+
| Atom; Vim; IntelliJ|
|Android Studio; V...|
|Vim; Android Stud...|
|Emacs; Notepad++;...|
|Notepad++; TextMa...|
+--------------------+
only showing top 5 rows



In [33]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col

so_survey_df.select('IDE', F.when(col('IDE').contains('Emacs'), 'Yay') \
                            .when(col('IDE').contains('Spacemacs'), 'Yay') \
                            .otherwise('Boo') \
                            .alias('analysis')).show(10)

+--------------------+--------+
|                 IDE|analysis|
+--------------------+--------+
|         Atom; Xcode|     Boo|
|Atom; Notepad++; ...|     Boo|
|Sublime Text; Vim...|     Boo|
|Notepad++; Sublim...|     Boo|
|                  NA|     Boo|
|          Emacs; Vim|     Yay|
|Sublime Text; IPy...|     Boo|
|                 Vim|     Boo|
|Vim; Visual Studi...|     Boo|
|Coda; Sublime Tex...|     Boo|
+--------------------+--------+
only showing top 10 rows



## Using UDFs

Spark SQL [UDF](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.UDFRegistration.register)

In [34]:
spark.udf.register("string_length", lambda x: len(x))
spark.sql("SELECT string_length(body) as body_length from questions").show(3)

+-----------+
|body_length|
+-----------+
|       1941|
|        249|
|        431|
+-----------+
only showing top 3 rows



## Advanced

[Broadcasting](https://spark.apache.org/docs/latest/sql-programming-guide.html#broadcast-hint-for-sql-queries)

[Caching](https://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory)

[JDBC](https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases)