### Notes
* 'select' behaves kinda like a lambda

In [32]:
import numpy as np
import pandas as pd
import string

### Row - kinda like python dict

In [19]:
from pyspark.sql import Row
row = Row(name='zadie',cuteness='ridic', age=1.5)
print row['age']
print row[1], row[2], row[0] # alphabetical?

1.5
ridic zadie 1.5


### spark dataframe from python list of tuples

In [23]:
data = [('Seiji', 32), ('Stephie', 30), ('Zadie', 1)]
sqlc = SQLContext(sc)
df = sqlc.createDataFrame(data, ['name','age'])
df.show()

+-------+---+
|   name|age|
+-------+---+
|  Seiji| 32|
|Stephie| 30|
|  Zadie|  1|
+-------+---+



### Slicing

In [24]:
df.select("*").show()

+-------+---+
|   name|age|
+-------+---+
|  Seiji| 32|
|Stephie| 30|
|  Zadie|  1|
+-------+---+



In [28]:
df.select('name', 'age').show()

+-------+---+
|   name|age|
+-------+---+
|  Seiji| 32|
|Stephie| 30|
|  Zadie|  1|
+-------+---+



In [44]:
df.select('name', (df['age']+10).alias('age+10')).show()

+-------+------+
|   name|age+10|
+-------+------+
|  Seiji|    42|
|Stephie|    40|
|  Zadie|    11|
+-------+------+



In [43]:
df.select('name', (df['age']+10).alias('age+10'), (df['age']+100).alias('age+100')).show()

+-------+------+-------+
|   name|age+10|age+100|
+-------+------+-------+
|  Seiji|    42|    132|
|Stephie|    40|    130|
|  Zadie|    11|    101|
+-------+------+-------+



In [45]:
df.drop(df['age']).show()

+-------+
|   name|
+-------+
|  Seiji|
|Stephie|
|  Zadie|
+-------+



### Unions

In [12]:
data = [('Seiji', 1), ('Stephie', 2), ('Zadie', 1)]
data2 = [('Alice', 1), ('Bob', 2)]

df = spark.createDataFrame(data,['name','age'])
df2 = spark.createDataFrame(data2,['name','age'])

In [13]:
df.show()

+-------+---+
|   name|age|
+-------+---+
|  Seiji|  1|
|Stephie|  2|
|  Zadie|  1|
+-------+---+



In [14]:
df2.show()

+-----+---+
| name|age|
+-----+---+
|Alice|  1|
|  Bob|  2|
+-----+---+



In [15]:
df.union(df2).show()

+-------+---+
|   name|age|
+-------+---+
|  Seiji|  1|
|Stephie|  2|
|  Zadie|  1|
|  Alice|  1|
|    Bob|  2|
+-------+---+



### User defined function

In [56]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

slen = udf(lambda s: len(s), IntegerType())
df.select('name', slen(df['name']).alias('slen')).show()

+-------+----+
|   name|slen|
+-------+----+
|  Seiji|   5|
|Stephie|   7|
|  Zadie|   5|
+-------+----+



### Transformations
* filter
* distint()
* orderBy
* sort
* explode

### random pandas DataFrame

In [33]:
x,y = 6,10
panDf = pd.DataFrame(np.random.randint(100,size=(x,y)),\
                     columns=list(string.ascii_lowercase)[:y])
panDf

Unnamed: 0,a,b,c,d,e,f,g,h,i,j
0,99,41,58,5,0,85,38,96,0,61
1,54,34,88,99,64,11,66,37,70,67
2,89,1,80,89,51,8,97,2,90,38
3,25,53,13,35,76,92,91,86,11,87
4,35,2,32,66,69,96,49,47,59,40
5,97,61,15,25,94,0,64,35,36,26


### pandas to spark DataFrame

In [34]:
sqlc = SQLContext(sc)
sparkDf = sqlc.createDataFrame(panDf)

### Slicing a dataframe

In [35]:
sparkDf.show()

+---+---+---+---+---+---+---+---+---+---+
|  a|  b|  c|  d|  e|  f|  g|  h|  i|  j|
+---+---+---+---+---+---+---+---+---+---+
| 99| 41| 58|  5|  0| 85| 38| 96|  0| 61|
| 54| 34| 88| 99| 64| 11| 66| 37| 70| 67|
| 89|  1| 80| 89| 51|  8| 97|  2| 90| 38|
| 25| 53| 13| 35| 76| 92| 91| 86| 11| 87|
| 35|  2| 32| 66| 69| 96| 49| 47| 59| 40|
| 97| 61| 15| 25| 94|  0| 64| 35| 36| 26|
+---+---+---+---+---+---+---+---+---+---+



In [36]:
sparkDf.take(2)

[Row(a=99, b=41, c=58, d=5, e=0, f=85, g=38, h=96, i=0, j=61),
 Row(a=54, b=34, c=88, d=99, e=64, f=11, g=66, h=37, i=70, j=67)]

In [37]:
sparkDf.collect()

[Row(a=99, b=41, c=58, d=5, e=0, f=85, g=38, h=96, i=0, j=61),
 Row(a=54, b=34, c=88, d=99, e=64, f=11, g=66, h=37, i=70, j=67),
 Row(a=89, b=1, c=80, d=89, e=51, f=8, g=97, h=2, i=90, j=38),
 Row(a=25, b=53, c=13, d=35, e=76, f=92, g=91, h=86, i=11, j=87),
 Row(a=35, b=2, c=32, d=66, e=69, f=96, g=49, h=47, i=59, j=40),
 Row(a=97, b=61, c=15, d=25, e=94, f=0, g=64, h=35, i=36, j=26)]

In [38]:
sparkDf.select('a').show()

+---+
|  a|
+---+
| 99|
| 54|
| 89|
| 25|
| 35|
| 97|
+---+



In [39]:
sparkDf.select('a','b').take(2)

[Row(a=99, b=41), Row(a=54, b=34)]

In [40]:
sparkDf.select(['a','b']).take(2)

[Row(a=99, b=41), Row(a=54, b=34)]

In [41]:
sparkDf.columns

['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']

### LabeledPoint rows from DataFrame

In [35]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors

In [52]:
featCols = ['a','b','c']

labeledData = (sparkDf.map(
        lambda row:
        LabeledPoint(row['h'],
                     Vectors.dense([row[el] for el in featCols])
                    ))
              )
print labeledData.take(1)

[LabeledPoint(17.0, [50.0,61.0,19.0])]


### LabeledPoint rows from DataFrame (method 2)

In [49]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from pyspark.mllib.linalg import Vectors

In [53]:
assembler = VectorAssembler(
    inputCols=featCols,
    outputCol="features")

transformed = assembler.transform(sparkDf)

labeledData2 = (transformed.select(col("h").alias("label"), col("features"))
               .map(lambda row: LabeledPoint(row.label, row.features))
              )
labeledData2.take(1)

[LabeledPoint(17.0, [50.0,61.0,19.0])]

### Collecting distinct labels

In [57]:
group_labels = labeledData2.map(lambda x: x.label).distinct().collect()
print group_labels

[48.0, 16.0, 17.0, 66.0, 38.0, 31.0]


#### Adding column based on logic of multiple column values

In [30]:
from pyspark.sql import Row
# credit: Austin in sparknotes slack channel 08-26
def myFunc(row):
    rowDict = row.asDict()
    rowDict['d'] = 1 if rowDict['a'] < 0.5 or rowDict['b'] < 0.5 else 0
    return Row(**rowDict)

In [32]:
sparkDf2 = sparkDf.map(myFunc).toDF()

In [33]:
sparkDf2.show()

+-------------------+------------------+------------------+---+
|                  a|                 b|                 c|  d|
+-------------------+------------------+------------------+---+
| 0.6138643778719053|0.5633879152061312|0.9856763019891056|  0|
|0.45372068691084566|0.9069223437838317|0.3839384600692365|  1|
+-------------------+------------------+------------------+---+



In [3]:
data = [('Seiji', 32), ('Stephie', 30), ('Zadie', 1)]
df = spark.createDataFrame(data,['name','age'])
df.show()

+-------+---+
|   name|age|
+-------+---+
|  Seiji| 32|
|Stephie| 30|
|  Zadie|  1|
+-------+---+



### feeding in Row to function with *

In [20]:
from pyspark.sql import functions as sqlFunctions

sqlStats = df.agg(sqlFunctions.min(df['age']),
       sqlFunctions.max(df['age']),
       sqlFunctions.avg(df['age']))

sqlStats.show()

+--------+--------+--------+
|min(age)|max(age)|avg(age)|
+--------+--------+--------+
|       1|      32|    21.0|
+--------+--------+--------+



In [22]:
sqlStatsRow = sqlStats.first()
print sqlStatsRow
print 'min: {0}, average: {2}, max: {1}'.format(*sqlStatsRow)

Row(min(age)=1, max(age)=32, avg(age)=21.0)
min: 1, average: 21.0, max: 32


### select vs withColumn

In [25]:
df.select('age', sqlFunctions.log(df['age'])).show()

+---+------------------+
|age|          LOG(age)|
+---+------------------+
| 32|3.4657359027997265|
| 30|3.4011973816621555|
|  1|               0.0|
+---+------------------+



In [27]:
df.withColumn('log(age)',sqlFunctions.log(df['age'])).show()

+-------+---+------------------+
|   name|age|          log(age)|
+-------+---+------------------+
|  Seiji| 32|3.4657359027997265|
|Stephie| 30|3.4011973816621555|
|  Zadie|  1|               0.0|
+-------+---+------------------+



## extracting StringIndexer label mapping

In [None]:
labelIndexer = StringIndexer(inputCol="y", outputCol="indexedLabel").fit(signalDf)
pd.DataFrame([(el,idx) for idx,el in enumerate(labelIndexer.labels)],columns=['y','indexedLabel'])
labelMap = dict([(idx,el) for idx,el in enumerate(labelIndexer.labels)])