In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
import findspark # You will need to pip install findspark
findspark.init()

In [3]:
from pyspark import SparkContext
sc = SparkContext("local", "Tutorial_02") # Spark UI at http://localhost:4040/jobs/

from pyspark.sql import SQLContext, HiveContext
#sqlContext = SQLContext(sc)
sqlContext = HiveContext(sc)

In [4]:
SOURCE = 'sales_sample.json'
df = sqlContext.read.json(SOURCE)
df.printSchema()
df.count()

root
 |-- invoice: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- team: string (nullable = true)



40

In [5]:
df.registerTempTable('sales')

In [6]:
sqlContext.sql('''
SELECT team, invoice FROM sales WHERE sales > 250000
''').toPandas()

Unnamed: 0,team,invoice
0,TEAM-5,TESTCO2016001
1,TEAM-5,TESTCO2016003
2,TEAM-5,TESTCO2016004
3,TEAM-3,TESTCO2016007
4,TEAM-5,TESTCO2016008
5,TEAM-5,TESTCO2016009
6,TEAM-5,TESTCO2016010
7,TEAM-3,TESTCO2016011
8,TEAM-4,TESTCO2016012
9,TEAM-3,TESTCO2016013


In [18]:
sqlContext.sql('''
SELECT team, sum(sales) as total_sales
FROM sales GROUP BY team
ORDER BY total_sales DESC
''').toPandas()

Unnamed: 0,team,total_sales
0,TEAM-5,6266574.18
1,TEAM-3,2185617.27
2,TEAM-2,2110930.15
3,TEAM-4,1268761.0
4,TEAM-1,1158811.3


In [21]:
sqlContext.sql('''
SELECT team, avg(sales) as avg_sale
FROM sales GROUP BY team
ORDER BY avg_sale DESC
''').toPandas()

Unnamed: 0,team,avg_sale
0,TEAM-5,522214.515
1,TEAM-4,422920.333333
2,TEAM-3,312231.038571
3,TEAM-2,234547.794444
4,TEAM-1,128756.811111


In [31]:
from pyspark.sql.functions import *

In [24]:
xdf = sqlContext.createDataFrame([([2, 1, 3],),([1],),([],)], ['data'])

In [26]:
xdf.show()

+---------+
|     data|
+---------+
|[2, 1, 3]|
|      [1]|
|       []|
+---------+



In [32]:
xdf.select(sort_array(xdf.data, asc=False).alias('r')).collect()

[Row(r=[3, 2, 1]), Row(r=[1]), Row(r=[])]

In [49]:
df.agg(approxCountDistinct(df['team']).alias('team_count')).collect()[0].asDict().values()

[5]

In [62]:
df.describe('sales').show()

+-------+------------------+
|summary|             sales|
+-------+------------------+
|  count|                40|
|   mean|324767.34749999986|
| stddev|156694.35210249847|
|    min|         101817.08|
|    max|         594960.07|
+-------+------------------+



In [63]:
df.select([mean('sales'), min('sales'), max('sales')]).show()


+------------------+----------+----------+
|        avg(sales)|min(sales)|max(sales)|
+------------------+----------+----------+
|324767.34749999986| 101817.08| 594960.07|
+------------------+----------+----------+



In [67]:
df.select([sqrt('sales')]).show(4)

+-----------------+
|      SQRT(sales)|
+-----------------+
|672.5195090107052|
|472.9546384168359|
|704.7124945678202|
|743.7008336690232|
+-----------------+
only showing top 4 rows



In [92]:
df.select(sum('sales').alias('total_sales')).show()

+--------------------+
|         total_sales|
+--------------------+
|1.2990693899999995E7|
+--------------------+



In [73]:
df.select(sum('sales').alias('total_sales')).collect()[0].asDict()

{'total_sales': 12990693.899999995}

In [80]:
from pyspark.sql.types import *
to_gbp = udf(lambda s: 1.3*s, FloatType())

In [91]:
df.select(to_gbp('sales').alias('sales_gbp')).show(5)

+---------+
|sales_gbp|
+---------+
|587967.25|
| 290791.9|
| 645605.6|
| 719018.2|
| 132362.2|
+---------+
only showing top 5 rows



In [107]:
x = df.map(lambda s: (s['team'], s['sales']))
type(x)

pyspark.rdd.PipelinedRDD

In [110]:
x.reduceByKey(lambda accum, n: accum + n).collect()

[(u'TEAM-2', 2110930.1500000004),
 (u'TEAM-3', 2185617.27),
 (u'TEAM-1', 1158811.3),
 (u'TEAM-4', 1268761.0),
 (u'TEAM-5', 6266574.180000001)]

In [114]:
a = x.aggregateByKey((0,0), lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1]))
a.collect()

[(u'TEAM-2', (2110930.1500000004, 9)),
 (u'TEAM-3', (2185617.27, 7)),
 (u'TEAM-1', (1158811.3, 9)),
 (u'TEAM-4', (1268761.0, 3)),
 (u'TEAM-5', (6266574.180000001, 12))]

In [115]:
b = a.mapValues(lambda v: v[0]/v[1]).collect()

In [116]:
b

[(u'TEAM-2', 234547.79444444447),
 (u'TEAM-3', 312231.03857142857),
 (u'TEAM-1', 128756.81111111112),
 (u'TEAM-4', 422920.3333333333),
 (u'TEAM-5', 522214.5150000001)]

In [97]:
#from pyspark.sql.window import Window

In [122]:
df.select(
    'sales',
    (1.30*df['sales']).alias('gbp')
).show(2)

+---------+----------+
|    sales|       gbp|
+---------+----------+
|452282.49|587967.237|
|223686.09|290791.917|
+---------+----------+
only showing top 2 rows



In [125]:
def usd_to_gbp(usd):
    return 1.30*usd

In [135]:
df.select(
    df['sales'].alias('sales_usd'),
    (usd_to_gbp(df['sales'])).alias('sales_gbp')
) \
.describe() \
.show()

+-------+------------------+------------------+
|summary|         sales_usd|         sales_gbp|
+-------+------------------+------------------+
|  count|                40|                40|
|   mean|324767.34749999986| 422197.5517500001|
| stddev|156694.35210249847|203702.65773324805|
|    min|         101817.08|        132362.204|
|    max|         594960.07|        773448.091|
+-------+------------------+------------------+



In [136]:
sqlContext.sql('''
SELECT team, sum(sales) as total_sales
FROM sales GROUP BY team
ORDER BY total_sales DESC
''').toPandas()

Unnamed: 0,team,total_sales
0,TEAM-5,6266574.18
1,TEAM-3,2185617.27
2,TEAM-2,2110930.15
3,TEAM-4,1268761.0
4,TEAM-1,1158811.3


In [172]:
foo = sqlContext.sql('''
SELECT team, sum(sales) as total_sales
FROM sales GROUP BY team
ORDER BY total_sales DESC
''')
foo.registerTempTable('foo_table')

In [185]:
from pyspark.sql.window import Window

In [194]:
df.filter(df['team']=='TEAM-5').show()

+-------------+---------+------+
|      invoice|    sales|  team|
+-------------+---------+------+
|TESTCO2016001|452282.49|TEAM-5|
|TESTCO2016003| 496619.7|TEAM-5|
|TESTCO2016004|553090.93|TEAM-5|
|TESTCO2016008|542159.19|TEAM-5|
|TESTCO2016009|456825.13|TEAM-5|
|TESTCO2016010|501443.96|TEAM-5|
|TESTCO2016014|560580.18|TEAM-5|
|TESTCO2016017|453861.42|TEAM-5|
|TESTCO2016025|579567.67|TEAM-5|
|TESTCO2016028|523371.07|TEAM-5|
|TESTCO2016032|594960.07|TEAM-5|
|TESTCO2016036|551812.37|TEAM-5|
+-------------+---------+------+



In [195]:
df.where(df['team']=='TEAM-5').show()

+-------------+---------+------+
|      invoice|    sales|  team|
+-------------+---------+------+
|TESTCO2016001|452282.49|TEAM-5|
|TESTCO2016003| 496619.7|TEAM-5|
|TESTCO2016004|553090.93|TEAM-5|
|TESTCO2016008|542159.19|TEAM-5|
|TESTCO2016009|456825.13|TEAM-5|
|TESTCO2016010|501443.96|TEAM-5|
|TESTCO2016014|560580.18|TEAM-5|
|TESTCO2016017|453861.42|TEAM-5|
|TESTCO2016025|579567.67|TEAM-5|
|TESTCO2016028|523371.07|TEAM-5|
|TESTCO2016032|594960.07|TEAM-5|
|TESTCO2016036|551812.37|TEAM-5|
+-------------+---------+------+



In [7]:
from pyspark.sql.window import Window

In [8]:
w = Window.partitionBy(df['team']).orderBy(df['sales'])

In [10]:
from pyspark.sql.functions import *

In [15]:
x = df.select('team', 'invoice', 'sales', percentRank().over(w).alias('pct_rank'), ntile(4).over(w).alias('ntile4'))

In [16]:
x.where(x['pct_rank']==1.0).show()

+------+-------------+---------+--------+------+
|  team|      invoice|    sales|pct_rank|ntile4|
+------+-------------+---------+--------+------+
|TEAM-1|TESTCO2016031|157618.02|     1.0|     4|
|TEAM-2|TESTCO2016037|273332.59|     1.0|     4|
|TEAM-3|TESTCO2016033|346253.33|     1.0|     4|
|TEAM-4|TESTCO2016035|440564.78|     1.0|     3|
|TEAM-5|TESTCO2016032|594960.07|     1.0|     4|
+------+-------------+---------+--------+------+



In [25]:
y = x.select('invoice', 'team', 'sales', 'pct_rank', abs(0.5 - x['pct_rank']).alias('epsilon'))
y.show()

+-------------+------+---------+-------------------+-------------------+
|      invoice|  team|    sales|           pct_rank|            epsilon|
+-------------+------+---------+-------------------+-------------------+
|TESTCO2016005|TEAM-1|101817.08|                0.0|                0.5|
|TESTCO2016006|TEAM-1|109938.58|              0.125|              0.375|
|TESTCO2016023|TEAM-1|110517.42|               0.25|               0.25|
|TESTCO2016019|TEAM-1|113095.44|              0.375|              0.125|
|TESTCO2016030|TEAM-1| 132661.2|                0.5|                0.0|
|TESTCO2016024|TEAM-1|139834.95|              0.625|              0.125|
|TESTCO2016029|TEAM-1|144630.01|               0.75|               0.25|
|TESTCO2016034|TEAM-1| 148698.6|              0.875|              0.375|
|TESTCO2016031|TEAM-1|157618.02|                1.0|                0.5|
|TESTCO2016020|TEAM-2| 199188.7|                0.0|                0.5|
|TESTCO2016027|TEAM-2|216324.01|              0.125

In [28]:
y.filter(y['epsilon']<0.125).show()

+-------------+------+---------+-------------------+--------------------+
|      invoice|  team|    sales|           pct_rank|             epsilon|
+-------------+------+---------+-------------------+--------------------+
|TESTCO2016030|TEAM-1| 132661.2|                0.5|                 0.0|
|TESTCO2016002|TEAM-2|223686.09|                0.5|                 0.0|
|TESTCO2016011|TEAM-3|309942.68|                0.5|                 0.0|
|TESTCO2016012|TEAM-4|429917.75|                0.5|                 0.0|
|TESTCO2016028|TEAM-5|523371.07|0.45454545454545453| 0.04545454545454547|
|TESTCO2016008|TEAM-5|542159.19| 0.5454545454545454|0.045454545454545414|
+-------------+------+---------+-------------------+--------------------+

