# Introduction

## Programming with RDDs

### Create RDD

#### By using parallelize( ) function

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

df = spark.sparkContext.parallelize([(1, 2, 3, 'a b c'),
                                    (4, 5, 6, 'd e f'),
                                    (7, 8, 9, 'g h i')]) \
                        .toDF(['col1', 'col2', 'col3','col4'])

df.show()

+----+----+----+-----+
|col1|col2|col3| col4|
+----+----+----+-----+
|   1|   2|   3|a b c|
|   4|   5|   6|d e f|
|   7|   8|   9|g h i|
+----+----+----+-----+



In [2]:
myData = spark.sparkContext.parallelize([(1,2), (3,4), (5,6), (7,8), (9,10)])
myData.collect()

[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]

#### By using createDataFrame( ) function

In [3]:
Employee = spark.createDataFrame([('1', 'Joe','70000', '1'),
                                    ('2', 'Henry','80000', '2'),
                                    ('3', 'Sam','60000', '2'),
                                    ('4', 'Max','90000', '1')],
                                ['Id', 'Name','Sallary','DepartmentId']
)

Employee.show()

+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
|  1|  Joe|  70000|           1|
|  2|Henry|  80000|           2|
|  3|  Sam|  60000|           2|
|  4|  Max|  90000|           1|
+---+-----+-------+------------+



#### using read and load functions

##### Read dataset from .csv file

In [4]:
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("./data/Advertising.csv", header=True)
df.show(5)
df.printSchema()

+---+-----+-----+---------+-----+
|_c0|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows

root
 |-- _c0: integer (nullable = true)
 |-- TV: double (nullable = true)
 |-- Radio: double (nullable = true)
 |-- Newspaper: double (nullable = true)
 |-- Sales: double (nullable = true)



##### Read dataset from DataBase

Note: Reading tables from Database needs the proper drive for the corresponding Database. For example,
the above demo needs org.postgresql.Driver and you need to download it and put it in jars folder
of your spark installation path. I download postgresql-42.1.1.jar from the official website and put
it in jars folder.

In [5]:
# ## User information
# user = 'your_username'
# pw
# = 'your_password'
# ## Database information
# table_name = 'table_name'
# url = 'jdbc:postgresql://##.###.###.##:5432/dataset?user='+user+'& password='+pw
# properties ={'driver': 'org.postgresql.Driver', 'password': pw,'user': user}
# df = spark.read.jdbc(url=url, table=table_name, properties=properties)
# df.show(5)
# df.printSchema()

##### Read dataset from HDFS

In [6]:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import HiveContext

sc = SparkContext.getOrCreate();
hc = HiveContext(sc)
tf1 = sc.textFile("./data/Advertising.csv")
print(tf1.first())
# hc.sql("use root")
# spf = hc.sql("SELECT * FROM ads LIMIT 100")
# print(spf.show(5))

,TV,Radio,Newspaper,Sales


### rdd.DataFrame vs pd.DataFrame

#### Create DataFrame

##### From List

In [7]:
import pandas as pd
my_list = [['a', 1, 2], ['b', 2, 3],['c', 3, 4]]
col_name = ['A', 'B', 'C']

print('pandas')
print(pd.DataFrame(my_list,columns= col_name))
print('spark')
print(spark.createDataFrame(my_list, col_name).show())

pandas
   A  B  C
0  a  1  2
1  b  2  3
2  c  3  4
spark
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  1|  2|
|  b|  2|  3|
|  c|  3|  4|
+---+---+---+

None


##### From Dict

In [8]:
import numpy as np

d = {'A': [0, 1, 0],
'B': [1, 0, 1],
'C': [1, 0, 0]}

print('pandas')
print(pd.DataFrame(d))
# Tedious for PySpark
print('spark')
spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()

pandas
   A  B  C
0  0  1  1
1  1  0  0
2  0  1  0
spark
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  0|  1|  1|
|  1|  0|  0|
|  0|  1|  0|
+---+---+---+

