In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark import sql

conf = SparkConf().setAppName("Introduction").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = sql.SQLContext(sc)

In [2]:
a = sc.parallelize([[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]]).toDF()
a.show()

+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
|  4|  d|
|  5|  e|
+---+---+



# Convert RDD to Dataframe 

In [3]:
records = sc.parallelize([[1, "Alice", 50], [2, "Bob", 80]])
records

ParallelCollectionRDD[8] at readRDDFromFile at PythonRDD.scala:262

In [4]:
df= records.toDF()
df

DataFrame[_1: bigint, _2: string, _3: bigint]

In [5]:
df.show()

+---+-----+---+
| _1|   _2| _3|
+---+-----+---+
|  1|Alice| 50|
|  2|  Bob| 80|
+---+-----+---+



# Creation of Dataframe

In [6]:
from pyspark.sql.types import Row
from datetime import datetime

#### With Row function

In [13]:
data = sc.parallelize([Row(id=10,name="Alice",score=50)])
df1=data.toDF()
df1.show()

+---+-----+-----+
| id| name|score|
+---+-----+-----+
| 10|Alice|   50|
+---+-----+-----+



#### Row function with Multiple rows

In [8]:
datas = sc.parallelize([Row(id=10,name="Alice",score=50),
                        Row(id=20,name="Bob",score=30),
                        Row(id=30,name="Job",score=60)]) 
df2=datas.toDF()
df2.show()

+---+-----+-----+
| id| name|score|
+---+-----+-----+
| 10|Alice|   50|
| 20|  Bob|   30|
| 30|  Job|   60|
+---+-----+-----+



#### Multiple rows with complex data types

In [9]:
complex_data = sc.parallelize([Row(
                                col_float=1.44,
                                col_integer=10,
                                col_string="John")
                           ])
df3=complex_data.toDF()
df3.show()

+---------+-----------+----------+
|col_float|col_integer|col_string|
+---------+-----------+----------+
|     1.44|         10|      John|
+---------+-----------+----------+



In [10]:
complex_data = sc.parallelize([Row(
                                col_float=1.44, 
                                col_integer=10, 
                                col_string="John", 
                                col_boolean=True, 
                                col_list=[1, 2, 3])
                           ])
df4=complex_data.toDF()
df4.show()

+---------+-----------+----------+-----------+---------+
|col_float|col_integer|col_string|col_boolean| col_list|
+---------+-----------+----------+-----------+---------+
|     1.44|         10|      John|       true|[1, 2, 3]|
+---------+-----------+----------+-----------+---------+



In [11]:
complex_data = sc.parallelize([Row(
                                col_list = [1, 2, 3], 
                                col_dict = {"k1": 0, "k2": 1, "k3": 2}, 
                                col_row = Row(columnA = 10, columnB = 20, columnC = 30), 
                                col_time = datetime(2014, 8, 1, 14, 1, 5)
                            )])
df5=complex_data.toDF()
df5.show()

+---------+--------------------+------------+-------------------+
| col_list|            col_dict|     col_row|           col_time|
+---------+--------------------+------------+-------------------+
|[1, 2, 3]|[k3 -> 2, k1 -> 0...|[10, 20, 30]|2014-08-01 14:01:05|
+---------+--------------------+------------+-------------------+



In [14]:
complex_data = sc.parallelize([Row(
                                col_list = [1, 2, 3],
                                col_dict = {"k1": 0},
                                col_row = Row(a=10, b=20, c=30),
                                col_time = datetime(2014, 8, 1, 14, 1, 5)
                            ),              
                            Row(
                                col_list = [1, 2, 3, 4, 5], 
                                col_dict = {"k1": 0,"k2": 1 }, 
                                col_row = Row(a=40, b=50, c=60),
                                col_time = datetime(2014, 8, 2, 14, 1, 6)
                            ),
                            Row(
                                col_list = [1, 2, 3, 4, 5, 6, 7], 
                                col_dict = {"k1": 0, "k2": 1, "k3": 2 }, 
                                col_row = Row(a=70, b=80, c=90),
                                col_time = datetime(2014, 8, 3, 14, 1, 7)
                            )])
complex_data_df= complex_data.toDF()
complex_data_df.show()

+--------------------+--------------------+------------+-------------------+
|            col_list|            col_dict|     col_row|           col_time|
+--------------------+--------------------+------------+-------------------+
|           [1, 2, 3]|           [k1 -> 0]|[10, 20, 30]|2014-08-01 14:01:05|
|     [1, 2, 3, 4, 5]|  [k1 -> 0, k2 -> 1]|[40, 50, 60]|2014-08-02 14:01:06|
|[1, 2, 3, 4, 5, 6...|[k3 -> 2, k1 -> 0...|[70, 80, 90]|2014-08-03 14:01:07|
+--------------------+--------------------+------------+-------------------+



#### Creating DataFrames using SQLContext
#### SQLContext can create dataframes directly from raw data

In [15]:
sqlContext = sql.SQLContext(sc)
sqlContext

<pyspark.sql.context.SQLContext at 0x10d1df0d0>

In [18]:
df_sqlContext = sqlContext.range(10)
df_sqlContext.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [19]:
df_sqlContext.count()

10

#### Dataframe using createDataFrame()

In [21]:
data = [('Alice', 50),
        ('Bob', 80),
        ('Charlee', 75)]
df = sqlContext.createDataFrame(data)
df.show()

+-------+---+
|     _1| _2|
+-------+---+
|  Alice| 50|
|    Bob| 80|
|Charlee| 75|
+-------+---+



In [22]:
#if u need to name the col
sqlContext.createDataFrame(data, ['Name', 'Score']).show()

+-------+-----+
|   Name|Score|
+-------+-----+
|  Alice|   50|
|    Bob|   80|
|Charlee|   75|
+-------+-----+



In [23]:
complex_data = [
                 (1.0,
                  10,
                  "Alice", 
                  True, 
                  [1, 2, 3], 
                  {"k1": 0},
                  Row(a=1, b=2, c=3), 
                  datetime(2014, 8, 1, 14, 1, 5)),

                 (2.0,
                  20,
                  "Bob", 
                  True, 
                  [1, 2, 3, 4, 5], 
                  {"k1": 0,"k2": 1 }, 
                  Row(a=1, b=2, c=3), 
                  datetime(2014, 8, 1, 14, 1, 5)),

                  (3.0,
                   30,
                   "Charlee", 
                   False, 
                   [1, 2, 3, 4, 5, 6], 
                   {"k1": 0, "k2": 1, "k3": 2 }, 
                   Row(a=1, b=2, c=3), 
                   datetime(2014, 8, 1, 14, 1, 5))
                ] 

In [24]:
sqlContext.createDataFrame(complex_data).show()

+---+---+-------+-----+------------------+--------------------+---------+-------------------+
| _1| _2|     _3|   _4|                _5|                  _6|       _7|                 _8|
+---+---+-------+-----+------------------+--------------------+---------+-------------------+
|1.0| 10|  Alice| true|         [1, 2, 3]|           [k1 -> 0]|[1, 2, 3]|2014-08-01 14:01:05|
|2.0| 20|    Bob| true|   [1, 2, 3, 4, 5]|  [k1 -> 0, k2 -> 1]|[1, 2, 3]|2014-08-01 14:01:05|
|3.0| 30|Charlee|false|[1, 2, 3, 4, 5, 6]|[k3 -> 2, k1 -> 0...|[1, 2, 3]|2014-08-01 14:01:05|
+---+---+-------+-----+------------------+--------------------+---------+-------------------+



In [32]:
# to name the columns
df_complexDatatype = sqlContext.createDataFrame(complex_data, [
        'col_integer',
        'col_float',
        'col_string',
        'col_boolean',
        'col_list',
        'col_dictionary',
        'col_row',
        'col_date_time']
    )
df_complexDatatype.show()

+-----------+---------+----------+-----------+------------------+--------------------+---------+-------------------+
|col_integer|col_float|col_string|col_boolean|          col_list|      col_dictionary|  col_row|      col_date_time|
+-----------+---------+----------+-----------+------------------+--------------------+---------+-------------------+
|        1.0|       10|     Alice|       true|         [1, 2, 3]|           [k1 -> 0]|[1, 2, 3]|2014-08-01 14:01:05|
|        2.0|       20|       Bob|       true|   [1, 2, 3, 4, 5]|  [k1 -> 0, k2 -> 1]|[1, 2, 3]|2014-08-01 14:01:05|
|        3.0|       30|   Charlee|      false|[1, 2, 3, 4, 5, 6]|[k3 -> 2, k1 -> 0...|[1, 2, 3]|2014-08-01 14:01:05|
+-----------+---------+----------+-----------+------------------+--------------------+---------+-------------------+



#### Creating dataframes using SQL Context and the Row function
     Row functions can be used without specifying column names

In [26]:
data = sc.parallelize([
    Row(1, "Alice", 50),
    Row(2, "Bob", 80),
    Row(3, "Charlee", 75)
])

In [27]:
column_names = Row('id', 'name', 'score')  
students = data.map(lambda r: column_names(*r))

In [28]:
students

PythonRDD[118] at RDD at PythonRDD.scala:53

In [29]:
students_df = sqlContext.createDataFrame(students)
students_df

DataFrame[id: bigint, name: string, score: bigint]

In [30]:
students_df.show()

+---+-------+-----+
| id|   name|score|
+---+-------+-----+
|  1|  Alice|   50|
|  2|    Bob|   80|
|  3|Charlee|   75|
+---+-------+-----+



# Operations on DataFrame

In [33]:
#operations will be done in below df which is created above
df_complexDatatype.show()

+-----------+---------+----------+-----------+------------------+--------------------+---------+-------------------+
|col_integer|col_float|col_string|col_boolean|          col_list|      col_dictionary|  col_row|      col_date_time|
+-----------+---------+----------+-----------+------------------+--------------------+---------+-------------------+
|        1.0|       10|     Alice|       true|         [1, 2, 3]|           [k1 -> 0]|[1, 2, 3]|2014-08-01 14:01:05|
|        2.0|       20|       Bob|       true|   [1, 2, 3, 4, 5]|  [k1 -> 0, k2 -> 1]|[1, 2, 3]|2014-08-01 14:01:05|
|        3.0|       30|   Charlee|      false|[1, 2, 3, 4, 5, 6]|[k3 -> 2, k1 -> 0...|[1, 2, 3]|2014-08-01 14:01:05|
+-----------+---------+----------+-----------+------------------+--------------------+---------+-------------------+




#### Extracting specific rows from dataframes

In [35]:
#first() - takes the first row
df_complexDatatype.first()

Row(col_integer=1.0, col_float=10, col_string='Alice', col_boolean=True, col_list=[1, 2, 3], col_dictionary={'k1': 0}, col_row=Row(a=1, b=2, c=3), col_date_time=datetime.datetime(2014, 8, 1, 14, 1, 5))

In [36]:
#take(integer) - will take the no of rows given
df_complexDatatype.take(2)

[Row(col_integer=1.0, col_float=10, col_string='Alice', col_boolean=True, col_list=[1, 2, 3], col_dictionary={'k1': 0}, col_row=Row(a=1, b=2, c=3), col_date_time=datetime.datetime(2014, 8, 1, 14, 1, 5)),
 Row(col_integer=2.0, col_float=20, col_string='Bob', col_boolean=True, col_list=[1, 2, 3, 4, 5], col_dictionary={'k1': 0, 'k2': 1}, col_row=Row(a=1, b=2, c=3), col_date_time=datetime.datetime(2014, 8, 1, 14, 1, 5))]

#### Extracting specific cells from dataframes

In [38]:
cell_string = df_complexDatatype.collect()[0][2]
cell_string

'Alice'

In [39]:
cell_dictionary = df_complexDatatype.collect()[2][5]
cell_dictionary

{'k3': 2, 'k1': 0, 'k2': 1}

In [41]:
cell_list = df_complexDatatype.collect()[0][4]
cell_list

[1, 2, 3]

In [44]:
cell_list.append(100)
cell_list

[1, 2, 3, 100, 100, 100]

#### Selecting specific columns

In [45]:
#selecting a single column
df_complexDatatype.select('col_float').show()

+---------+
|col_float|
+---------+
|       10|
|       20|
|       30|
+---------+



In [46]:
#selecting specific columns - method 1
df_complexDatatype.select('col_string',
                          'col_dictionary',
                          'col_date_time').show()

+----------+--------------------+-------------------+
|col_string|      col_dictionary|      col_date_time|
+----------+--------------------+-------------------+
|     Alice|           [k1 -> 0]|2014-08-01 14:01:05|
|       Bob|  [k1 -> 0, k2 -> 1]|2014-08-01 14:01:05|
|   Charlee|[k3 -> 2, k1 -> 0...|2014-08-01 14:01:05|
+----------+--------------------+-------------------+



In [48]:
#selecting specific columns - method 2
df_complexDatatype.rdd\
    .map(lambda x : (x.col_string,x.col_date_time))\
    .collect()

[('Alice', datetime.datetime(2014, 8, 1, 14, 1, 5)),
 ('Bob', datetime.datetime(2014, 8, 1, 14, 1, 5)),
 ('Charlee', datetime.datetime(2014, 8, 1, 14, 1, 5))]

#### Editing columns

In [50]:
df_complexDatatype.rdd\
           .map(lambda x: (x.col_string + " Hello"))\
           .collect()

['Alice Hello', 'Bob Hello', 'Charlee Hello']

#### Adding a column in a Dataframe

In [56]:
#creating a sum colum with col_integer+col_float
df_complexDatatype.select('col_integer',
                          'col_float')\
                    .withColumn('Sum',
                                (df_complexDatatype.col_integer+df_complexDatatype.col_float))\
                    .show()

+-----------+---------+----+
|col_integer|col_float| Sum|
+-----------+---------+----+
|        1.0|       10|11.0|
|        2.0|       20|22.0|
|        3.0|       30|33.0|
+-----------+---------+----+



In [57]:
#creating a col_opposite for col_boolen
df_complexDatatype.select('col_boolean')\
                  .withColumn('col_opposite',
                             (df_complexDatatype.col_boolean == False))\
                    .show()

+-----------+------------+
|col_boolean|col_opposite|
+-----------+------------+
|       true|       false|
|       true|       false|
|      false|        true|
+-----------+------------+



#### Editing a column name

In [58]:
df_complexDatatype.withColumnRenamed("col_dictionary","col_map").show()

+-----------+---------+----------+-----------+------------------+--------------------+---------+-------------------+
|col_integer|col_float|col_string|col_boolean|          col_list|             col_map|  col_row|      col_date_time|
+-----------+---------+----------+-----------+------------------+--------------------+---------+-------------------+
|        1.0|       10|     Alice|       true|         [1, 2, 3]|           [k1 -> 0]|[1, 2, 3]|2014-08-01 14:01:05|
|        2.0|       20|       Bob|       true|   [1, 2, 3, 4, 5]|  [k1 -> 0, k2 -> 1]|[1, 2, 3]|2014-08-01 14:01:05|
|        3.0|       30|   Charlee|      false|[1, 2, 3, 4, 5, 6]|[k3 -> 2, k1 -> 0...|[1, 2, 3]|2014-08-01 14:01:05|
+-----------+---------+----------+-----------+------------------+--------------------+---------+-------------------+



#### Using Alias name for a column

In [60]:
df_complexDatatype.select(df_complexDatatype['col_string'].alias('Name')).show()

+-------+
|   Name|
+-------+
|  Alice|
|    Bob|
|Charlee|
+-------+



#### Interoperablity between Pandas dataframe and Spark dataframe

In [61]:
import pandas

In [None]:
df_pandas = complex_data_df.toPandas()
df_pandas