In [1]:
sc

In [2]:
from pyspark.sql.types import Row
from datetime import datetime

In [3]:
simple_data = sc.parallelize([1, "Alice", 50])
simple_data

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [4]:
simple_data.count()

3

In [5]:
!python --version

Python 3.7.1


In [6]:
!java -version

java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)


In [7]:
simple_data.first()

1

In [8]:
simple_data.take(2)

[1, 'Alice']

In [9]:
simple_data.collect()

[1, 'Alice', 50]

In [10]:
df = simple_data.toDF() ## fails becuase data types are different

TypeError: Can not infer schema for type: <class 'int'>

In [11]:
records = sc.parallelize([[1, "Alice", 50], [2, "Bob", 80]])
records

ParallelCollectionRDD[8] at parallelize at PythonRDD.scala:195

In [12]:
records.collect()

[[1, 'Alice', 50], [2, 'Bob', 80]]

In [13]:
records.first()

[1, 'Alice', 50]

In [14]:
df = records.toDF()
df

DataFrame[_1: bigint, _2: string, _3: bigint]

In [15]:
df.show()

+---+-----+---+
| _1|   _2| _3|
+---+-----+---+
|  1|Alice| 50|
|  2|  Bob| 80|
+---+-----+---+



## Use Row if you want to associate column names with RDD

In [18]:
# Row means single record
data = sc.parallelize([Row(id=1,name="Alice",score=50),
                      Row(id=2,name="Bob",score=80),
                      Row(id=3,name="Charless",score=75)])
data.collect()

[Row(id=1, name='Alice', score=50),
 Row(id=2, name='Bob', score=80),
 Row(id=3, name='Charless', score=75)]

In [19]:
df = data.toDF()
df.show()

+---+--------+-----+
| id|    name|score|
+---+--------+-----+
|  1|   Alice|   50|
|  2|     Bob|   80|
|  3|Charless|   75|
+---+--------+-----+



In [20]:
### complex data Row supports all sorts of Data Dict as column
### Even nested row as column 
### Datetime as column
### List a clumn
complex_data= sc.parallelize([Row(
                             col_list = [1,2,3],
                             col_dict = {'k1':0},
                             col_row = Row(a=10, b=20 , c=30),
                             col_time = datetime(2014, 8, 1 , 14, 1 , 5)
                            ),
                            Row(
                             col_list = [1,2,3,4,5],
                             col_dict = {'k1':0, 'k2':1},
                             col_row = Row(a=10, b=20 , c=30),
                             col_time = datetime(2014, 8, 2 , 14, 1 , 6)
                            ),
                             Row(
                             col_list = [1,2,3,4,5,6,7],
                             col_dict = {'k1':0, 'k2':1, 'k3':2 },
                             col_row = Row(a=10, b=20 , c=30),
                             col_time = datetime(2014, 8, 3 , 14, 1 , 7)
                            )])
df=complex_data.toDF()
df.show()

+--------------------+--------------------+------------+-------------------+
|            col_dict|            col_list|     col_row|           col_time|
+--------------------+--------------------+------------+-------------------+
|           [k1 -> 0]|           [1, 2, 3]|[10, 20, 30]|2014-08-01 14:01:05|
|  [k1 -> 0, k2 -> 1]|     [1, 2, 3, 4, 5]|[10, 20, 30]|2014-08-02 14:01:06|
|[k3 -> 2, k1 -> 0...|[1, 2, 3, 4, 5, 6...|[10, 20, 30]|2014-08-03 14:01:07|
+--------------------+--------------------+------------+-------------------+



### SQL Context 

In [21]:
sqlContext = SQLContext(sc)

In [22]:
sqlContext

<pyspark.sql.context.SQLContext at 0x117ed8860>

In [23]:
df = sqlContext.range(5)
df
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [24]:
df.count()

5

In [26]:
## Creating DataFrames directly from input Data, no conversion from RDD to DF
data = [('Alice', 50),
        ('Bob', 80),
        ('Charleee', 75)]

## Creating DF from SQLContext is easier , no need for RDD
## Creating DF from Data Directly , using createDataDrame Function

In [27]:
sqlContext.createDataFrame(data).show()

+--------+---+
|      _1| _2|
+--------+---+
|   Alice| 50|
|     Bob| 80|
|Charleee| 75|
+--------+---+



In [29]:
sqlContext.createDataFrame(data,['Name', 'Score']).show()

+--------+-----+
|    Name|Score|
+--------+-----+
|   Alice|   50|
|     Bob|   80|
|Charleee|   75|
+--------+-----+



In [33]:
complex_data = [
                (1.0,
                10,
                "Alice",
                 True,
                [1,2,3],
                {"k1": 0},
                Row(a=1, b=2, c=3),
                datetime(2014, 8, 3 , 14, 1 , 5)),
                
                (2.0,
                20,
                "Bob",
                 True,
                [1,2,3],
                {"k1": 0,"k2":1},
                Row(a=1, b=2, c=3),
                datetime(2014, 8, 3 , 14, 1 , 6)),
    
                (3.0,
                30,
                "Charlee",
                 False,
                [1,2,3],
                {"k1": 0, "k2":1 , "k3":1},
                Row(a=1, b=2, c=3),
                datetime(2014, 8, 3 , 14, 1 , 7))
                ]

In [34]:
complex_data_df = sqlContext.createDataFrame(complex_data,
                                            ['col_integer',
                                             'col_float',
                                             'col_string',
                                             'col_boolean',
                                             'col_list',
                                             'col_dictionary',
                                             'col_row',
                                             'col_date_time'])
complex_data_df.show()

+-----------+---------+----------+-----------+---------+--------------------+---------+-------------------+
|col_integer|col_float|col_string|col_boolean| col_list|      col_dictionary|  col_row|      col_date_time|
+-----------+---------+----------+-----------+---------+--------------------+---------+-------------------+
|        1.0|       10|     Alice|       true|[1, 2, 3]|           [k1 -> 0]|[1, 2, 3]|2014-08-03 14:01:05|
|        2.0|       20|       Bob|       true|[1, 2, 3]|  [k1 -> 0, k2 -> 1]|[1, 2, 3]|2014-08-03 14:01:06|
|        3.0|       30|   Charlee|      false|[1, 2, 3]|[k3 -> 1, k1 -> 0...|[1, 2, 3]|2014-08-03 14:01:07|
+-----------+---------+----------+-----------+---------+--------------------+---------+-------------------+



### Another way of specifying Rows

In [35]:
data = sc.parallelize([
    Row(1, "Alice", 50),
    Row(2, "Bob" , 80),
    Row(3, "Charlee", 75)
])

## Giving Column names later

### Perform a map operation on each RDD to
### assign name to columns

In [39]:
column_names= Row('id', 'name' , 'score')
students = data.map(lambda r: column_names(*r)) ## map operates on every element 

In [41]:
students.collect()### feild names assigned to row object

[Row(id=1, name='Alice', score=50),
 Row(id=2, name='Bob', score=80),
 Row(id=3, name='Charlee', score=75)]

In [44]:
#### creating a DataFrame for RDD
students_df = sqlContext.createDataFrame(students)
students_df ## Notice type of each column is infered correctly

DataFrame[id: bigint, name: string, score: bigint]

## Back to Complex DF

In [45]:
complex_data_df.first()

Row(col_integer=1.0, col_float=10, col_string='Alice', col_boolean=True, col_list=[1, 2, 3], col_dictionary={'k1': 0}, col_row=Row(a=1, b=2, c=3), col_date_time=datetime.datetime(2014, 8, 3, 14, 1, 5))

In [47]:
### DF is a tabular form and you access it as individual cells
cell_string = complex_data_df.collect()[0][2]
cell_string

'Alice'

In [51]:
cell_list = complex_data_df.collect()[0][4] ## returns a copy not actual 
print(cell_list)
## modifying list doesn't affect DF
cell_list.append(100)
complex_data_df.first()

[1, 2, 3]


Row(col_integer=1.0, col_float=10, col_string='Alice', col_boolean=True, col_list=[1, 2, 3], col_dictionary={'k1': 0}, col_row=Row(a=1, b=2, c=3), col_date_time=datetime.datetime(2014, 8, 3, 14, 1, 5))

# Every DataFrame Stores the RDD equivalent of the records in the variable rdd

In [53]:
complex_data_df.rdd\
               .map(lambda x: (x.col_string , x.col_dictionary))\
               .collect()

[('Alice', {'k1': 0}),
 ('Bob', {'k1': 0, 'k2': 1}),
 ('Charlee', {'k3': 1, 'k1': 0, 'k2': 1})]

In [54]:
### Getting column using select 
complex_data_df.select('col_string','col_list','col_date_time').show()

+----------+---------+-------------------+
|col_string| col_list|      col_date_time|
+----------+---------+-------------------+
|     Alice|[1, 2, 3]|2014-08-03 14:01:05|
|       Bob|[1, 2, 3]|2014-08-03 14:01:06|
|   Charlee|[1, 2, 3]|2014-08-03 14:01:07|
+----------+---------+-------------------+



In [58]:
## Adding a new column to DF, Some things you can do using select
## This returns a Data Frame
complex_data_df.select('col_integer','col_float')\
                .withColumn("col_sum", complex_data_df.col_integer + complex_data_df.col_float)\
                .show()
                

+-----------+---------+-------+
|col_integer|col_float|col_sum|
+-----------+---------+-------+
|        1.0|       10|   11.0|
|        2.0|       20|   22.0|
|        3.0|       30|   33.0|
+-----------+---------+-------+



## Following operations create new DF

In [59]:
complex_data_df.withColumnRenamed('col_dictionary', 'col_map').show() ## New DF with renamed column

+-----------+---------+----------+-----------+---------+--------------------+---------+-------------------+
|col_integer|col_float|col_string|col_boolean| col_list|             col_map|  col_row|      col_date_time|
+-----------+---------+----------+-----------+---------+--------------------+---------+-------------------+
|        1.0|       10|     Alice|       true|[1, 2, 3]|           [k1 -> 0]|[1, 2, 3]|2014-08-03 14:01:05|
|        2.0|       20|       Bob|       true|[1, 2, 3]|  [k1 -> 0, k2 -> 1]|[1, 2, 3]|2014-08-03 14:01:06|
|        3.0|       30|   Charlee|      false|[1, 2, 3]|[k3 -> 1, k1 -> 0...|[1, 2, 3]|2014-08-03 14:01:07|
+-----------+---------+----------+-----------+---------+--------------------+---------+-------------------+



In [60]:
complex_data_df.select(complex_data_df.col_string.alias("Name")).show() ## new 1 Column DF

+-------+
|   Name|
+-------+
|  Alice|
|    Bob|
|Charlee|
+-------+



## Spark DF can be converted to Pandas DF
## Spark DF are built on RDD and distributed across multiple nodes
## Where as Pandas DF are in memory on a single machine

In [61]:
import pandas

In [62]:
df_pandas = complex_data_df.toPandas()
df_pandas

Unnamed: 0,col_integer,col_float,col_string,col_boolean,col_list,col_dictionary,col_row,col_date_time
0,1.0,10,Alice,True,"[1, 2, 3]",{'k1': 0},"(1, 2, 3)",2014-08-03 14:01:05
1,2.0,20,Bob,True,"[1, 2, 3]","{'k1': 0, 'k2': 1}","(1, 2, 3)",2014-08-03 14:01:06
2,3.0,30,Charlee,False,"[1, 2, 3]","{'k3': 1, 'k1': 0, 'k2': 1}","(1, 2, 3)",2014-08-03 14:01:07


## Creating Spark DF from Pandas DF using SQLContext

In [63]:
df_spark = sqlContext.createDataFrame(df_pandas).show()
df_spark

+-----------+---------+----------+-----------+---------+--------------------+---------+-------------------+
|col_integer|col_float|col_string|col_boolean| col_list|      col_dictionary|  col_row|      col_date_time|
+-----------+---------+----------+-----------+---------+--------------------+---------+-------------------+
|        1.0|       10|     Alice|       true|[1, 2, 3]|           [k1 -> 0]|[1, 2, 3]|2014-08-03 14:01:05|
|        2.0|       20|       Bob|       true|[1, 2, 3]|  [k1 -> 0, k2 -> 1]|[1, 2, 3]|2014-08-03 14:01:06|
|        3.0|       30|   Charlee|      false|[1, 2, 3]|[k3 -> 1, k1 -> 0...|[1, 2, 3]|2014-08-03 14:01:07|
+-----------+---------+----------+-----------+---------+--------------------+---------+-------------------+

