In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
        .master("local[*]") \
        .appName("DataFrame_Basics") \
        .getOrCreate()

In [13]:
columns = ["language","users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]

In [47]:
type(data)

list

In [14]:
sc = spark.sparkContext

In [16]:
rdd = sc.parallelize(data)

rdd.collect()

[('Java', '20000'), ('Python', '100000'), ('Scala', '3000')]

# Creating DataFrames

## 1. Creating Dataframes from existing RDD using DF() functions

In [19]:
dfFromRDD1 = rdd.toDF()

In [24]:
print(dfFromRDD1.show())

dfFromRDD1.printSchema()

+------+------+
|    _1|    _2|
+------+------+
|  Java| 20000|
|Python|100000|
| Scala|  3000|
+------+------+

None
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)



In [33]:
dfFromRDD2 = rdd.toDF(columns)
dfFromRDD2.show()

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



## 2. Creating Dataframe via SparkSession.createDataFrame function

In [35]:
dfFromRDD3 = spark.createDataFrame(data).toDF(*columns)

In [39]:
dfFromRDD3.show()

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



## 3.Creating Dataframe via SparkSession.createDataFrame with data and schema

In [42]:


dfFromRDD4 = spark.createDataFrame(data=data, schema = columns)

dfFromRDD4.show()


+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



## 4.Creating Dataframe with row type

In [None]:


# createDataFrame() has another signature in PySpark which takes the collection of Row type and schema for column names as arguments. To use this first we need to convert our “data” object from array to array of Row.

# Note : Since iterator in Map in source iteration, it will work for only one iteration. So a list is created separately and called in createDataFrame function.

In [90]:
from pyspark.sql import Row

rowData = map(lambda x: Row(*x), data)

listrowData = list(rowData)

print(rowData)


<map object at 0x7f4dbd915ed0>


In [101]:
dfFromData3 = spark.createDataFrame(listrowData, columns)

dfFromData3.show()


+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



## 5.Creating a DataFrame from existing List Collection in Python

In [104]:
ListDF = spark.createDataFrame(data=data, schema=columns)
ListDF.show()
ListDF.printSchema()

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+

root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)



In [None]:
#Adding columns from Schema

In [105]:
dept = [("Finance",10), 
        ("Marketing",20), 
        ("Sales",30), 
        ("IT",40) 
      ]

deptColumns = ["dept_name","dept_id"]

In [112]:
from pyspark.sql.types import StructType, StructField, StringType

deptschema = StructType([
    StructField("Dept_Name", StringType(), True),
    StructField("Dept_Id", StringType(), True)
])

deptDF = spark.createDataFrame(data=dept, schema=deptschema)

deptDF.show()

+---------+-------+
|Dept_Name|Dept_Id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



## Using list of Row type

In [113]:

dept2 = [Row("Finance",10), 
        Row("Marketing",20), 
        Row("Sales",30), 
        Row("IT",40) 
      ]

deptDF2 = spark.createDataFrame(data=dept2, schema = deptColumns)

deptDF2.show()

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



## Converting List of Row collection Types into RDD and creating DataFrames from that RDD

In [119]:


dept2_rdd = sc.parallelize(dept2)

dept2_rdd.collect()

dept2_DF_From_RDD = dept2_rdd.toDF(deptColumns)

dept2_DF_From_RDD.show()


+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



## Converting Pyspark DataFrame to Pandas DataFrame

In [121]:
PandasDF = deptDF.toPandas()
print(PandasDF)

   Dept_Name Dept_Id
0    Finance      10
1  Marketing      20
2      Sales      30
3         IT      40


## Converting nested struct DataFrame

In [122]:
# Nested structure elements
from pyspark.sql.types import StructType, StructField, StringType,IntegerType
dataStruct = [(("James","","Smith"),"36636","M","3000"), \
      (("Michael","Rose",""),"40288","M","4000"), \
      (("Robert","","Williams"),"42114","M","4000"), \
      (("Maria","Anne","Jones"),"39192","F","4000"), \
      (("Jen","Mary","Brown"),"","F","-1") \
]

schemaStruct = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
          StructField('dob', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', StringType(), True)
         ])
df = spark.createDataFrame(data=dataStruct, schema = schemaStruct)
df.printSchema()

pandasDF2 = df.toPandas()
print(pandasDF2)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)

                   name    dob gender salary
0      (James, , Smith)  36636      M   3000
1     (Michael, Rose, )  40288      M   4000
2  (Robert, , Williams)  42114      M   4000
3  (Maria, Anne, Jones)  39192      F   4000
4    (Jen, Mary, Brown)             F     -1


## Creating Empty DataFrame

In [123]:
schema = StructType([
  StructField('firstname', StringType(), True),
  StructField('middlename', StringType(), True),
  StructField('lastname', StringType(), True)
  ])

In [124]:
empty_rdd = sc.emptyRDD()

In [125]:
empty_rdd_DF = empty_rdd.toDF(schema)

In [127]:
empty_rdd_DF.show()

empty_rdd_DF.printSchema()

+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
+---------+----------+--------+

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)

