In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.sql import SQLContext

## Map, Filter and Reduce in Python

In [2]:
a = range(1,10)
a

[1, 2, 3, 4, 5, 6, 7, 8, 9]

In [3]:
t = [(1,5), (3,9), (5,8)]
t

[(1, 5), (3, 9), (5, 8)]

### map

In [4]:
map(lambda x:x**2, a)

[1, 4, 9, 16, 25, 36, 49, 64, 81]

In [5]:
map(lambda t:t[0], t)

[1, 3, 5]

In [6]:
map(lambda t:t[0]*t[1], t)

[5, 27, 40]

In [7]:
map(lambda t:range(t[0],t[1]), t)

[[1, 2, 3, 4], [3, 4, 5, 6, 7, 8], [5, 6, 7]]

### filter

In [8]:
filter(lambda x:x%3==0, a)

[3, 6, 9]

In [9]:
filter(lambda t:t[0]>2, t)

[(3, 9), (5, 8)]

### reduce

In [10]:
reduce(lambda x,y:x+y, a)

45

In [11]:
reduce(lambda x,y:x+y, map(lambda t:t[0]*t[1], t))

72

### flattening

In [12]:
from itertools import chain

b = map(lambda t: range(t[0],t[1]) ,[(1,5), (10,15)])

flat = list(chain.from_iterable(b))

print b
print flat

[[1, 2, 3, 4], [10, 11, 12, 13, 14]]
[1, 2, 3, 4, 10, 11, 12, 13, 14]


# start Spark
***

## SparkContext
***

In [3]:
sc = SparkContext()

## Create RDD
***

### parallelize

In [4]:
list_rdd = sc.parallelize(range(1,10))

In [5]:
age = [("Jimmy", 18),
       ("Bob", 22),
       ("Rod", 29)]
salary = [("Jimmy", 20000),
          ("Bob", 23000),
          ("Rod", 25000)]
age_rdd = sc.parallelize(age)
salary_rdd = sc.parallelize(salary)

### textFile

In [4]:
persons_rdd = sc.textFile("data/persons.txt")

## Showing RDD elements
***

### first

In [18]:
list_rdd.first()

1

In [117]:
salary_rdd.first()

('Jimmy', 20000)

In [118]:
age_rdd.first()

('Jimmy', 18)

In [5]:
persons_rdd.first()

u'Bob\tMale\t25'

### take

In [88]:
list_rdd.take(5)

[1, 2, 3, 4, 5]

In [28]:
persons_rdd.take(5)

[u'Bob\tMale\t25',
 u'John\tMale\t23',
 u'Jimmy\tMale\t27',
 u'Marshal\tMale\t26',
 u'Tina\tFemale\t19']

### collect

In [89]:
list_rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9]

In [104]:
age_rdd.collect()

[('Jimmy', 18), ('Bob', 22), ('Rod', 29)]

In [105]:
salary_rdd.collect()

[('Jimmy', 20000), ('Bob', 23000), ('Rod', 25000)]

In [30]:
persons_rdd.collect()

[u'Bob\tMale\t25',
 u'John\tMale\t23',
 u'Jimmy\tMale\t27',
 u'Marshal\tMale\t26',
 u'Tina\tFemale\t19',
 u'Amy\t\tFemale\t21',
 u'Penny\tFemale\t23']

## map, filter and reduce in Spark
***

### map
***

In [92]:
mapList = list_rdd.map(lambda x:x**2)

In [93]:
mapList.collect()

[1, 4, 9, 16, 25, 36, 49, 64, 81]

In [31]:
age_rdd.map(lambda x:x[0]).collect()

['Jimmy', 'Bob', 'Rod']

In [32]:
persons_rdd.map(lambda l:l.split('\t')).collect()

[[u'Bob', u'Male', u'25'],
 [u'John', u'Male', u'23'],
 [u'Jimmy', u'Male', u'27'],
 [u'Marshal', u'Male', u'26'],
 [u'Tina', u'Female', u'19'],
 [u'Amy', u'', u'Female', u'21'],
 [u'Penny', u'Female', u'23']]

### filter
***

In [94]:
filterList = list_rdd.filter(lambda x:x%2==0)

In [95]:
filterList.collect()

[2, 4, 6, 8]

In [111]:
salary_rdd.filter(lambda x:x[1]>21000).collect()

[('Bob', 23000), ('Rod', 25000)]

In [112]:
salary_rdd.filter(lambda x:x[1]>21000).map(lambda x:x[0]).collect()

['Bob', 'Rod']

### reduce
***

In [98]:
reduceList = list_rdd.reduce(lambda x,y:x+y)
reduceList

45

In [114]:
age_rdd.filter(lambda x:x[1]>20).map(lambda x:x[1]).reduce(lambda x,y:x+y)

51

## other reduders
***

In [100]:
mapList.sum()

285

In [101]:
mapList.max()

81

In [102]:
mapList.min()

1

In [103]:
mapList.mean()

31.666666666666668

## join
***

In [115]:
total_rdd = salary_rdd.join(age_rdd)
total_rdd.collect()

[('Jimmy', (20000, 18)), ('Bob', (23000, 22)), ('Rod', (25000, 29))]

## reduceByKey

In [33]:
rklist = [("A", 10), ("A", 12), ("A", 9),
          ("B", 4), ("B", 7), ("B", 12), ("B", 11),
          ("C", 15), ("C", 18)] 
rklist_rdd = sc.parallelize(rklist)

rklist_rdd.collect()

[('A', 10),
 ('A', 12),
 ('A', 9),
 ('B', 4),
 ('B', 7),
 ('B', 12),
 ('B', 11),
 ('C', 15),
 ('C', 18)]

### find number of items for each group

In [124]:
rklist_rdd.map(lambda x:(x[0], 1)).reduceByKey(lambda x,y:x+y).collect()

[('A', 3), ('C', 2), ('B', 4)]

### find min value for each group

In [128]:
rklist_rdd.reduceByKey(lambda x,y:min(x,y)).collect()

[('A', 9), ('C', 15), ('B', 4)]

### find max value for each group

In [133]:
rklist_rdd.reduceByKey(max).collect()

[('A', 12), ('C', 18), ('B', 12)]

### working on persons_rdd

In [56]:
persons_kv_rdd = persons_rdd.map(lambda l:l.split()).map(lambda l:(l[1], (int(l[2]), 1)))
persons_kv_rdd.collect()

[(u'Male', (25, 1)),
 (u'Male', (23, 1)),
 (u'Male', (27, 1)),
 (u'Male', (26, 1)),
 (u'Female', (19, 1)),
 (u'Female', (21, 1)),
 (u'Female', (23, 1))]

In [61]:
persons_kv_rdd.reduceByKey(lambda x,y:(x[0]+y[0], x[1]+y[1])).collect()

[(u'Female', (63, 3)), (u'Male', (101, 4))]

## Writing parser to create structure

In [101]:
class Person:
    def parse(self,line):
        fields = line.split('\t')
        self.name = fields[0]
        self.gender = fields[1]
        self.age = int(fields[2])
        return self

In [110]:
person = Person()
persons_rdd.map(person.parse).map(lambda t:(t.gender,t.age)).reduceByKey(lambda x,y:x+y).collect()

[(u'Female', 63), (u'Male', 101)]

# Read Tables
***

## SQLContext
***

In [122]:
from pyspark.sql import SQLContext

In [24]:
sqlCxt = SQLContext(sc)

In [127]:
people = sqlCxt.read.json('data/people.json')

### show

In [128]:
people.show()

+---+-----------------+------+--------+
|age|favorite_language|gender|    name|
+---+-----------------+------+--------+
| 40|           Python|     M| Orlando|
| 39|               C#|     F|    Lina|
| 30|           Python|     M|    John|
| 32|           Python|     F|    Jane|
| 18|           Python|     F|Michelle|
| 20|               C#|     M|  Daniel|
+---+-----------------+------+--------+



### select

In [130]:
people.select("name").show()

+--------+
|    name|
+--------+
| Orlando|
|    Lina|
|    John|
|    Jane|
|Michelle|
|  Daniel|
+--------+



In [131]:
people.select("age", "name", "gender").show()

+---+--------+------+
|age|    name|gender|
+---+--------+------+
| 40| Orlando|     M|
| 39|    Lina|     F|
| 30|    John|     M|
| 32|    Jane|     F|
| 18|Michelle|     F|
| 20|  Daniel|     M|
+---+--------+------+



In [135]:
people.select("name", people.age*2).show()

+--------+---------+
|    name|(age * 2)|
+--------+---------+
| Orlando|       80|
|    Lina|       78|
|    John|       60|
|    Jane|       64|
|Michelle|       36|
|  Daniel|       40|
+--------+---------+



### filter

In [136]:
people.filter(people.age>30).show()

+---+-----------------+------+-------+
|age|favorite_language|gender|   name|
+---+-----------------+------+-------+
| 40|           Python|     M|Orlando|
| 39|               C#|     F|   Lina|
| 32|           Python|     F|   Jane|
+---+-----------------+------+-------+



In [139]:
people.filter(people.gender == "F").show()

+---+-----------------+------+--------+
|age|favorite_language|gender|    name|
+---+-----------------+------+--------+
| 39|               C#|     F|    Lina|
| 32|           Python|     F|    Jane|
| 18|           Python|     F|Michelle|
+---+-----------------+------+--------+



In [137]:
people.filter(people.name=="Orlando").show()

+---+-----------------+------+-------+
|age|favorite_language|gender|   name|
+---+-----------------+------+-------+
| 40|           Python|     M|Orlando|
+---+-----------------+------+-------+



### groupby

In [141]:
people.groupBy("gender").count().show()

+------+-----+
|gender|count|
+------+-----+
|     F|    3|
|     M|    3|
+------+-----+



In [142]:
people.groupBy("gender").sum().show()

+------+--------+
|gender|sum(age)|
+------+--------+
|     F|      89|
|     M|      90|
+------+--------+



## Using SQL-like query

### registerTempTable

In [143]:
people.registerTempTable("people")

### sql(*`query`*)

In [144]:
sqlCxt.sql("select * from people").show()

+---+-----------------+------+--------+
|age|favorite_language|gender|    name|
+---+-----------------+------+--------+
| 40|           Python|     M| Orlando|
| 39|               C#|     F|    Lina|
| 30|           Python|     M|    John|
| 32|           Python|     F|    Jane|
| 18|           Python|     F|Michelle|
| 20|               C#|     M|  Daniel|
+---+-----------------+------+--------+



In [145]:
sqlCxt.sql("select name, age*2 AS Double_AGE from people").show()

+--------+----------+
|    name|Double_AGE|
+--------+----------+
| Orlando|        80|
|    Lina|        78|
|    John|        60|
|    Jane|        64|
|Michelle|        36|
|  Daniel|        40|
+--------+----------+



In [154]:
sqlCxt.sql("select * from people order by age desc").show()

+---+-----------------+------+--------+
|age|favorite_language|gender|    name|
+---+-----------------+------+--------+
| 40|           Python|     M| Orlando|
| 39|               C#|     F|    Lina|
| 32|           Python|     F|    Jane|
| 30|           Python|     M|    John|
| 20|               C#|     M|  Daniel|
| 18|           Python|     F|Michelle|
+---+-----------------+------+--------+



In [163]:
sqlCxt.sql("select gender, mean(age) AS Average_of_age from people group by gender").show()

+------+------------------+
|gender|    Average_of_age|
+------+------------------+
|     F|29.666666666666668|
|     M|              30.0|
+------+------------------+



## Create Schema
***

In [18]:
from pyspark.sql import *
from pyspark.sql.types import *

In [27]:
sales_rdd = sc.textFile("data/sales.txt").map(lambda l:l.split("\t")).map(lambda t:(t[0], t[1], t[2], int(t[3])))
stores_rdd = sc.textFile("data/stores.txt").map(lambda l:l.split("\t")).map(lambda t:(t[0], t[1]))
products_rdd = sc.textFile("data/products.txt").map(lambda l:l.split("\t")).map(lambda t:(t[0], t[1], t[2]))

In [28]:
print sales_rdd.first()
print stores_rdd.first()
print products_rdd.first()

(u'2014-01-01', u'1', u'1', 100)
(u'1', u'First store')
(u'1', u'red stuff', u'stuff')


### StructField
***

In [29]:
sales_fields = [StructField("day", StringType(), False),
               StructField("store", StringType(), False),
               StructField("product", StringType(), False),
               StructField("quantity", IntegerType(), False)]

stores_fields = [StructField("id", StringType(), False),
                StructField("name", StringType(), False)]

products_fields = [StructField("id", StringType(), False),
                 StructField("name", StringType(), False),
                 StructField("category", StringType(), False)]

### StructType

In [30]:
sales_schema = StructType(sales_fields)
stores_schema = StructType(stores_fields)
products_schema = StructType(products_fields)

### createDataFrame

In [31]:
sales_df = sqlCxt.createDataFrame(sales_rdd, sales_schema)
stores_df = sqlCxt.createDataFrame(stores_rdd, stores_schema)
products_df = sqlCxt.createDataFrame(products_rdd, products_schema)

## *`Now Use SQL-like query`*

In [33]:
sqlCxt.registerDataFrameAsTable(sales_df, "sales")
sqlCxt.registerDataFrameAsTable(stores_df, "stores")
sqlCxt.registerDataFrameAsTable(products_df, "products")

In [35]:
sqlCxt.sql("SELECT \
            day, store, product, name \
            FROM sales \
            JOIN stores \
            ON sales.store=stores.id").show()

+----------+-----+-------+------------+
|       day|store|product|        name|
+----------+-----+-------+------------+
|2014-01-01|    3|      1| Third store|
|2014-01-01|    1|      1| First store|
|2014-01-01|    1|      2| First store|
|2014-01-01|    1|      3| First store|
|2014-01-01|    2|      1|Second store|
|2014-01-01|    2|      2|Second store|
+----------+-----+-------+------------+



In [39]:
sqlCxt.sql("SELECT \ 
           day, store, product, quantity, name, category \
           FROM sales \
           JOIN products \
           ON sales.product=products.id").show()

+----------+-----+-------+--------+----------+--------+
|       day|store|product|quantity|      name|category|
+----------+-----+-------+--------+----------+--------+
|2014-01-01|    1|      3|      54|    thingy|thingies|
|2014-01-01|    1|      1|     100| red stuff|   stuff|
|2014-01-01|    2|      1|      50| red stuff|   stuff|
|2014-01-01|    3|      1|      75| red stuff|   stuff|
|2014-01-01|    1|      2|      37|blue stuff|   stuff|
|2014-01-01|    2|      2|      40|blue stuff|   stuff|
+----------+-----+-------+--------+----------+--------+



## *`Or Using pandas-like functions`*

In [41]:
from pyspark.sql import functions as fn

In [44]:
sales_df.show()

+----------+-----+-------+--------+
|       day|store|product|quantity|
+----------+-----+-------+--------+
|2014-01-01|    1|      1|     100|
|2014-01-01|    1|      2|      37|
|2014-01-01|    1|      3|      54|
|2014-01-01|    2|      1|      50|
|2014-01-01|    2|      2|      40|
|2014-01-01|    3|      1|      75|
|2014-01-01|    4|      4|       1|
+----------+-----+-------+--------+



In [49]:
sales_df.groupby("product").agg(fn.sum("quantity").alias("total Quantity"), ).show()

+-------+--------------+
|product|total Quantity|
+-------+--------------+
|      3|            54|
|      1|           225|
|      4|             1|
|      2|            77|
+-------+--------------+



In [51]:
sales_df.groupby("store").agg(fn.sum("quantity").alias("total quantity")).show()

+-----+--------------+
|store|total quantity|
+-----+--------------+
|    3|            75|
|    1|           191|
|    4|             1|
|    2|            90|
+-----+--------------+

