In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import (StructType, 
                               StructField, 
                               FloatType, 
                               IntegerType, 
                               StringType)
from pyspark.sql.functions import udf, avg, sum, count

import pandas as pd
from pprint import pprint
sc = SparkContext("local[*]", "My App Name")
sqlContext = SQLContext(sc)

In [2]:
row = Row(name='John', GPA=3.4)
print(row.name)
print(row.GPA)

John
3.4


In [3]:
data = [(2310,3.4,'CA',18), 
        (1173,3.9,'CA',19), 
        (20393,4.0,'DC',20), 
        (3841,3.4,'CA',19), 
        (39183,3.8,'DC',20)]

schema = StructType([StructField('SID',     IntegerType(),  True),
                     StructField('GPA',      FloatType(),   True),
                     StructField('province', StringType(),  True),
                     StructField('age',      IntegerType(), True)])

data2 = [(1173,'John'), 
        (2310, 'Alex'), 
        (39183,'Mike'), 
        (20393,'Jessica'), 
        (3841,'Eric'),
        (19328,'Luke')]

schema2 = StructType([StructField('SID',  IntegerType(),  True),
                      StructField('name', StringType(),   True)])
                     
                    
df = sqlContext.createDataFrame(data, schema)
df2 = sqlContext.createDataFrame(data2, schema2)
                      
df.show(4)
print(df.count())
pprint(df.collect())
df.describe().show()

+-----+---+--------+---+
|  SID|GPA|province|age|
+-----+---+--------+---+
| 2310|3.4|      CA| 18|
| 1173|3.9|      CA| 19|
|20393|4.0|      DC| 20|
| 3841|3.4|      CA| 19|
+-----+---+--------+---+
only showing top 4 rows

5
[Row(SID=2310, GPA=3.4000000953674316, province='CA', age=18),
 Row(SID=1173, GPA=3.9000000953674316, province='CA', age=19),
 Row(SID=20393, GPA=4.0, province='DC', age=20),
 Row(SID=3841, GPA=3.4000000953674316, province='CA', age=19),
 Row(SID=39183, GPA=3.799999952316284, province='DC', age=20)]
+-------+------------------+-------------------+------------------+
|summary|               SID|                GPA|               age|
+-------+------------------+-------------------+------------------+
|  count|                 5|                  5|                 5|
|   mean|           13380.0|  3.700000047683716|              19.2|
| stddev|16412.839851774585|0.28284267454246087|0.8366600265340756|
|    min|              1173|                3.4|                

In [4]:
normalize = udf(lambda x: 100*x/4.0, FloatType())
df_GPA_normalized = df.select(df.SID, normalize(df.GPA).alias('percentage'))
df_GPA_normalized.take(3)

[Row(SID=2310, percentage=85.0),
 Row(SID=1173, percentage=97.5),
 Row(SID=20393, percentage=100.0)]

In [5]:
df_divide = df.select(df.SID, (df.GPA/df.age).alias('ratio'))
df_divide.show()

+-----+-------------------+
|  SID|              ratio|
+-----+-------------------+
| 2310|0.18888889418707955|
| 1173|0.20526316291407534|
|20393|                0.2|
| 3841|0.17894737344039113|
|39183| 0.1899999976158142|
+-----+-------------------+



In [6]:
df_low_GPA = df.filter(df.GPA < 3.8).select(df.SID, df.GPA).limit(2)
pprint(df_low_GPA.collect())

[Row(SID=2310, GPA=3.4000000953674316), Row(SID=3841, GPA=3.4000000953674316)]


In [7]:
df_low_GPA_sorted = df_low_GPA.sort('GPA', ascending=False)
pprint([(x.SID, x.GPA) for x in df_low_GPA_sorted.collect()])

[(2310, 3.4000000953674316), (3841, 3.4000000953674316)]


In [8]:
df_province_avg = df.groupBy('province').agg({'GPA':'avg', 'SID':'count'})
panda_df = df_province_avg.toPandas()
panda_df.head()

Unnamed: 0,province,avg(GPA),count(SID)
0,DC,3.9,2
1,CA,3.566667,3


In [9]:
df_province_count = df.groupBy('province').agg(count('GPA').alias('count'))
df_province_count.show()

+--------+-----+
|province|count|
+--------+-----+
|      DC|    2|
|      CA|    3|
+--------+-----+



In [10]:
print(df.agg(avg('GPA').alias('avggpa')).first().avggpa)

3.700000047683716


In [11]:
df3 = df.join(df2, on='SID', how='outer')
df3.show()

+-----+----+--------+----+-------+
|  SID| GPA|province| age|   name|
+-----+----+--------+----+-------+
|19328|null|    null|null|   Luke|
| 3841| 3.4|      CA|  19|   Eric|
|39183| 3.8|      DC|  20|   Mike|
| 1173| 3.9|      CA|  19|   John|
|20393| 4.0|      DC|  20|Jessica|
| 2310| 3.4|      CA|  18|   Alex|
+-----+----+--------+----+-------+



In [12]:
df = sqlContext.read.text('textfile.txt')
df.collect()

[Row(value='1. This is the first line'),
 Row(value='2. This is the second line'),
 Row(value='3. another line'),
 Row(value='4. one more line'),
 Row(value='5. last line')]

In [13]:
panda_df = pd.DataFrame({'column1':[2,3,4], 'column2':[1.2,3,2]})
df = sqlContext.createDataFrame(panda_df)
df.collect()

[Row(column1=2, column2=1.2),
 Row(column1=3, column2=3.0),
 Row(column1=4, column2=2.0)]

In [14]:
slen = udf(lambda s: len(s), IntegerType())
df = sqlContext.read.text('textfile.txt')
new_df = df.select(slen(df.value).alias('slen'))
new_df.collect()

[Row(slen=25), Row(slen=26), Row(slen=15), Row(slen=16), Row(slen=12)]