In [2]:
from pyspark.sql import SparkSession

### create RDD

In [7]:
spark = SparkSession \
    .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.sparkContext.parallelize([(1, 2, 3, 'a b c'),
             (4, 5, 6, 'd e f'),
             (7, 8, 9, 'g h i')]).toDF(['col1', 'col2', 'col3','col4'])

In [8]:
df.show()

+----+----+----+-----+
|col1|col2|col3| col4|
+----+----+----+-----+
|   1|   2|   3|a b c|
|null|   5|   6|d e f|
|   7|null|   9|g h i|
+----+----+----+-----+



In [14]:
myData = spark.sparkContext.parallelize([(1,2), (3,4), (5,6), (7,8), (9,10)])
print(myData.collect())
type(myData)

[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]


pyspark.rdd.RDD

In [12]:
# By using createDataFrame( ) function
Employee = spark.createDataFrame([
                        ('1', 'Joe',   '70000', '1'),
                        ('2', 'Henry', '80000', '2'),
                        ('3', 'Sam',   '60000', '2'),
                        ('4', 'Max',   '90000', '1')],
                        ['Id', 'Name', 'Sallary','DepartmentId']
                       )
Employee.show()

+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
|  1|  Joe|  70000|           1|
|  2|Henry|  80000|           2|
|  3|  Sam|  60000|           2|
|  4|  Max|  90000|           1|
+---+-----+-------+------------+



In [25]:
# By using read and load functions,  or using format('csv')
# from csv
df = spark.read.format('com.databricks.spark.csv').\
                load("Advertising.csv",header=True)
df.show(5)
df.printSchema()

+---+-----+-----+---------+-----+
|_c0|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows

root
 |-- _c0: string (nullable = true)
 |-- TV: string (nullable = true)
 |-- Radio: string (nullable = true)
 |-- Newspaper: string (nullable = true)
 |-- Sales: string (nullable = true)



In [None]:
# Read dataset from DataBase
## User information
user = 'your_username'
pw   = 'your_password'

## Database information
table_name = 'table_name'
url = 'jdbc:postgresql://##.###.###.##:5432/dataset?user='+user+'&password='+pw
properties ={'driver': 'org.postgresql.Driver', 'password': pw,'user': user}

df = spark.read.jdbc(url=url, table=table_name, properties=properties)

df.show(5)
df.printSchema()

In [None]:
# Read dataset from HDFS
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import HiveContext

sc= SparkContext('local','example')
hc = HiveContext(sc)
tf1 = sc.textFile("hdfs://cdhstltest/user/data/demo.CSV")
print(tf1.first())

hc.sql("use intg_cme_w")
spf = hc.sql("SELECT * FROM spf LIMIT 100")
print(spf.show(5))

## Spark Operations
### transformation and actions please see contect notebook

## DataFrames

In [10]:
import pandas as pd
my_list = [['a', 1, 2], ['b', 2, 3],['c', 3, 4]]
col_name = ['A', 'B', 'C']
# caution for the columns=
print(pd.DataFrame(my_list,columns= col_name).head())
spark.createDataFrame(my_list, col_name).show()

   A  B  C
0  a  1  2
1  b  2  3
2  c  3  4
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  1|  2|
|  b|  2|  3|
|  c|  3|  4|
+---+---+---+



In [34]:
# from dictionary
d = {'A': [0, 1, 0],
     'B': [1, 0, 1],
     'C': [1, 0, 0]}
dp=pd.DataFrame(d)
print(pd_df.head())
# Tedious for PySpark
ds=spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys()))
ds.show()

   A  B  C
0  0  1  1
1  1  0  0
2  0  1  0
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  0|  1|  1|
|  1|  0|  0|
|  0|  1|  0|
+---+---+---+



In [None]:
# from json
dp = pd.read_json("data/data.json")
ds = spark.read.json('data/data.json')

In [40]:
# dataframe operation
my_list = [['male', 1, None], ['female', 2, 3],['male', 3, 4]]
dp = pd.DataFrame(my_list,columns=['A', 'B', 'C'])
ds = spark.createDataFrame(my_list, ['A', 'B', 'C'])
ds.columns
ds.show()
ds.dtypes
ds.fillna(-99)
ds.na.replace(['male','female'],['1','0']).show()  #replace

+------+---+----+
|     A|  B|   C|
+------+---+----+
|  male|  1|null|
|female|  2|   3|
|  male|  3|   4|
+------+---+----+

+---+---+----+
|  A|  B|   C|
+---+---+----+
|  1|  1|null|
|  0|  2|   3|
|  1|  3|   4|
+---+---+----+



In [44]:
# rename a column name
ds1=ds.withColumnRenamed('C','CC')
ds1.show()

+------+---+----+
|     A|  B|  CC|
+------+---+----+
|  male|  1|null|
|female|  2|   3|
|  male|  3|   4|
+------+---+----+



In [45]:
# drop columns, ds does not change
drop_name = ['B','C']
ds.drop(*drop_name).show(4)

+------+
|     A|
+------+
|  male|
|female|
|  male|
+------+



In [47]:
# filter
ds = spark.read.csv(path='Advertising.csv',
                    header=True,
                    inferSchema=True)
ds[ds.Newspaper<20].show(4)

+---+-----+-----+---------+-----+
|_c0|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  8|120.2| 19.6|     11.6| 13.2|
|  9|  8.6|  2.1|      1.0|  4.8|
| 12|214.7| 24.0|      4.0| 17.4|
| 14| 97.5|  7.6|      7.2|  9.7|
+---+-----+-----+---------+-----+
only showing top 4 rows



In [57]:
# ceate a new column
import pyspark.sql.functions as F
ds.groupBy().agg(F.sum("TV")).collect()[0][0]
ds.withColumn('tv_norm', ds.TV/ds.groupBy().agg(F.sum("TV")).collect()[0][0]).show(4)
ds.withColumn('log_tv',F.log(ds.TV)).show(4)
ds.withColumn('tv+10', ds.TV+10).show(4)

+---+-----+-----+---------+-----+--------------------+
|_c0|   TV|Radio|Newspaper|Sales|             tv_norm|
+---+-----+-----+---------+-----+--------------------+
|  1|230.1| 37.8|     69.2| 22.1|0.007824268493802813|
|  2| 44.5| 39.3|     45.1| 10.4|0.001513167961643...|
|  3| 17.2| 45.9|     69.3|  9.3|5.848649200061207E-4|
|  4|151.5| 41.3|     58.5| 18.5|0.005151571824472517|
+---+-----+-----+---------+-----+--------------------+
only showing top 4 rows

+---+-----+-----+---------+-----+------------------+
|_c0|   TV|Radio|Newspaper|Sales|            log_tv|
+---+-----+-----+---------+-----+------------------+
|  1|230.1| 37.8|     69.2| 22.1|  5.43851399704132|
|  2| 44.5| 39.3|     45.1| 10.4|3.7954891891721947|
|  3| 17.2| 45.9|     69.3|  9.3|2.8449093838194073|
|  4|151.5| 41.3|     58.5| 18.5| 5.020585624949423|
+---+-----+-----+---------+-----+------------------+
only showing top 4 rows



In [60]:
# dataframe join
leftp = pd.DataFrame({'A': ['A0', 'A1', 'A2', 'A3'],
                    'B': ['B0', 'B1', 'B2', 'B3'],
                    'C': ['C0', 'C1', 'C2', 'C3'],
                    'D': ['D0', 'D1', 'D2', 'D3']},
                    index=[0, 1, 2, 3])

rightp = pd.DataFrame({'A': ['A0', 'A1', 'A6', 'A7'],
                       'F': ['B4', 'B5', 'B6', 'B7'],
                       'G': ['C4', 'C5', 'C6', 'C7'],
                       'H': ['D4', 'D5', 'D6', 'D7']},
                       index=[4, 5, 6, 7])

lefts = spark.createDataFrame(leftp)
rights = spark.createDataFrame(rightp)
lefts.show()
rights.show()

#left join
leftp.merge(rightp,on='A',how='left')

lefts.join(rights,on='A',how='left').orderBy('A',ascending=True).show()

#right join
leftp.merge(rightp,on='A',how='right')
lefts.join(rights,on='A',how='right').orderBy('A',ascending=True).show()

#inner join
leftp.merge(rightp,on='A',how='inner')
lefts.join(rights,on='A',how='inner').orderBy('A',ascending=True).show()

#full join
leftp.merge(rightp,on='A',how='outer')
lefts.join(rights,on='A',how='full').orderBy('A',ascending=True).show()

+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
| A0| B0| C0| D0|
| A1| B1| C1| D1|
| A2| B2| C2| D2|
| A3| B3| C3| D3|
+---+---+---+---+

+---+---+---+---+
|  A|  F|  G|  H|
+---+---+---+---+
| A0| B4| C4| D4|
| A1| B5| C5| D5|
| A6| B6| C6| D6|
| A7| B7| C7| D7|
+---+---+---+---+

+---+---+---+---+----+----+----+
|  A|  B|  C|  D|   F|   G|   H|
+---+---+---+---+----+----+----+
| A0| B0| C0| D0|  B4|  C4|  D4|
| A1| B1| C1| D1|  B5|  C5|  D5|
| A2| B2| C2| D2|null|null|null|
| A3| B3| C3| D3|null|null|null|
+---+---+---+---+----+----+----+

+---+----+----+----+---+---+---+
|  A|   B|   C|   D|  F|  G|  H|
+---+----+----+----+---+---+---+
| A0|  B0|  C0|  D0| B4| C4| D4|
| A1|  B1|  C1|  D1| B5| C5| D5|
| A6|null|null|null| B6| C6| D6|
| A7|null|null|null| B7| C7| D7|
+---+----+----+----+---+---+---+



In [62]:
# concat columns
my_list = [('a', 2, 3),
           ('b', 5, 6),
           ('c', 8, 9),
           ('a', 2, 3),
           ('b', 5, 6),
           ('c', 8, 9)]
col_name = ['col1', 'col2', 'col3']
dp = pd.DataFrame(my_list,columns=col_name)
ds = spark.createDataFrame(my_list,schema=col_name)
dp['concat'] = dp.apply(lambda x:'%s%s'%(x['col1'],x['col2']),axis=1)
print(dp.head())
ds.withColumn('concat',F.concat('col1','col2')).show()

  col1  col2  col3 concat
0    a     2     3     a2
1    b     5     6     b5
2    c     8     9     c8
3    a     2     3     a2
4    b     5     6     b5
+----+----+----+------+
|col1|col2|col3|concat|
+----+----+----+------+
|   a|   2|   3|    a2|
|   b|   5|   6|    b5|
|   c|   8|   9|    c8|
|   a|   2|   3|    a2|
|   b|   5|   6|    b5|
|   c|   8|   9|    c8|
+----+----+----+------+



In [63]:
# groupby
dp.groupby(['col1']).agg({'col2':'min','col3':'mean'})
ds.groupBy(['col1']).agg({'col2': 'min', 'col3': 'avg'}).show()

+----+---------+---------+
|col1|min(col2)|avg(col3)|
+----+---------+---------+
|   c|        8|      9.0|
|   b|        5|      6.0|
|   a|        2|      3.0|
+----+---------+---------+



In [64]:
#pivot table
pd.pivot_table(dp, values='col3', index='col1', columns='col2', aggfunc=np.sum)
ds.groupBy(['col1']).pivot('col2').sum('col3').show()

+----+----+----+----+
|col1|   2|   5|   8|
+----+----+----+----+
|   c|null|null|  18|
|   b|null|  12|null|
|   a|   6|null|null|
+----+----+----+----+



In [67]:
# window
d = {'A':['a','b','c','d'],'B':['m','m','n','n'],'C':[1,2,3,6]}
dp = pd.DataFrame(d)
ds = spark.createDataFrame(dp)
dp['rank'] = dp.groupby('B')['C'].rank('dense',ascending=False)
print(dp.head())
from pyspark.sql.window import Window
w = Window.partitionBy('B').orderBy(ds.C.desc())
ds = ds.withColumn('rank',F.rank().over(w))
ds.show()

   A  B  C  rank
0  a  m  1   2.0
1  b  m  2   1.0
2  c  n  3   2.0
3  d  n  6   1.0
+---+---+---+----+
|  A|  B|  C|rank|
+---+---+---+----+
|  b|  m|  2|   1|
|  a|  m|  1|   2|
|  d|  n|  6|   1|
|  c|  n|  3|   2|
+---+---+---+----+



In [69]:
# rank vs dense_rank
d ={'Id':[1,2,3,4,5,6],
    'Score': [4.00, 4.00, 3.85, 3.65, 3.65, 3.50]}
#
data = pd.DataFrame(d)
dp = data.copy()
ds = spark.createDataFrame(data)
dp['Rank_dense'] = dp['Score'].rank(method='dense',ascending =False)
dp['Rank'] = dp['Score'].rank(method='min',ascending =False)
print(dp.head())
#
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w = Window.orderBy(ds.Score.desc())
ds = ds.withColumn('Rank_spark_dense',F.dense_rank().over(w))
ds = ds.withColumn('Rank_spark',F.rank().over(w))
ds.show()

   Id  Score  Rank_dense  Rank
0   1   4.00         1.0   1.0
1   2   4.00         1.0   1.0
2   3   3.85         2.0   3.0
3   4   3.65         3.0   4.0
4   5   3.65         3.0   4.0
+---+-----+----------------+----------+
| Id|Score|Rank_spark_dense|Rank_spark|
+---+-----+----------------+----------+
|  1|  4.0|               1|         1|
|  2|  4.0|               1|         1|
|  3| 3.85|               2|         3|
|  4| 3.65|               3|         4|
|  5| 3.65|               3|         4|
|  6|  3.5|               4|         6|
+---+-----+----------------+----------+



## data manipulation

In [82]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0, "Python python Spark Spark"),
    (1, "Python SQL")],
 ["document", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
vectorizer  = CountVectorizer(inputCol="words", outputCol="rawFeatures")

idf = IDF(inputCol="rawFeatures", outputCol="features")

pipeline = Pipeline(stages=[tokenizer, vectorizer, idf])

model = pipeline.fit(sentenceData)

In [83]:
import numpy as np

total_counts = model.transform(sentenceData)\
                    .select('rawFeatures').rdd\
                    .map(lambda row: row['rawFeatures'].toArray())\
                    .reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])

vocabList = model.stages[1].vocabulary
d = {'vocabList':vocabList,'counts':total_counts}

spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()

+---------+------+
|vocabList|counts|
+---------+------+
|   python|   3.0|
|    spark|   2.0|
|      sql|   1.0|
+---------+------+

