In [1]:
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [2]:
import sys

# Add the path to your module's directory
sys.path.append('/home/vladyslav_podrazhanskyi/projects/PERSONAL/python/learn_spark')

# Now you can import your 
from my_code.utils import ROOT

In [3]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
print(spark)

24/02/12 16:52:58 WARN Utils: Your hostname, EPUAKHAW05DF resolves to a loopback address: 127.0.1.1; using 192.168.100.3 instead (on interface wlp0s20f3)
24/02/12 16:52:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/12 16:52:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<pyspark.sql.session.SparkSession object at 0x7fea46aa6510>


You can manually create a PySpark DataFrame using 
toDF() and createDataFrame() methods, 
both these function takes different signatures in order to create DataFrame from existing RDD, list, and DataFrame.

In [4]:
# In order to create a DataFrame from a list we need the data hence, first, let’s create the data and the columns that are needed.

columns = ["language","users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]



In [5]:
# 0. Create RDD

rdd = spark.sparkContext.parallelize(data)
print(rdd)
print(type(rdd))

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289
<class 'pyspark.rdd.RDD'>


In [6]:
"""
1.1 Using toDF() function
PySpark RDD’s toDF() method is used to create a DataFrame from the existing RDD. Since RDD doesn’t have columns, 
the DataFrame is created with default column names “_1” and “_2” as we have two columns.
"""

# convert rdd to spark dataframe without column names using toDF 

dfFromRDD1 = rdd.toDF()      # can be added column names with 
dfFromRDD1.printSchema()

                                                                                

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)



In [7]:
# convert rdd to spark dataframe with column names using toDF(*columns) 

dfFromRDD2 = rdd.toDF(columns)    # without * for RDD
dfFromRDD2.printSchema()

root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)



In [8]:
"""
1.2 Using createDataFrame() from SparkSession
Using createDataFrame() from SparkSession is another way to create manually 
and it takes rdd object as an argument. and chain with toDF() to specify name to the columns.
"""

# convert rdd with column names using createDataFrame 
# createDataFrame method can accept data both rdd and list (see 2.1.) 


dfFromRDD3 = spark.createDataFrame(rdd).toDF(*columns)   # * in *columns because this DF, not RDD

dfFromRDD3.printSchema()

root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)



In [9]:
"""
2. Create DataFrame from List Collection

In this section, we will see how to create PySpark DataFrame from a list. 
These examples would be similar to what we have seen in the above section with RDD, 
but we use the list data object instead of “rdd” object to create DataFrame.
"""

"""
2.1 Using createDataFrame() from SparkSession
Calling createDataFrame() from SparkSession is another way to create PySpark DataFrame manually, 
it takes a list object as an argument. and chain with toDF() to specify names to the columns.
"""

# df from list

dfFromData = spark.createDataFrame(data).toDF(*columns)   # spark.createDataFrame(data, columns)
dfFromData.printSchema()

root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)



In [10]:
"""
2.2 Using createDataFrame() with the Row type
createDataFrame() has another signature in PySpark which takes the collection of Row type and schema 
for column names as arguments.
To use this first we need to convert our “data” object from the list to list of Row.
"""

# data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]

rowData = map(lambda x: Row(*x), data)
print(list(rowData))

dfFromRowData = spark.createDataFrame(data, columns)
dfFromRowData.show()
dfFromRowData.printSchema()


[<Row('Java', '20000')>, <Row('Python', '100000')>, <Row('Scala', '3000')>]
+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+

root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)



In [15]:
"""
2.3 Create DataFrame with schema
If you wanted to specify the column names along with their data types, 
you should create the StructType schema first and then assign this while creating a DataFrame.
"""

data2 = [
    ("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
]

schema = T.StructType([ 
    T.StructField("firstname", T.StringType(),True), 
    T.StructField("middlename", T.StringType(),True), 
    T.StructField("lastname", T.StringType(),True), 
    T.StructField("id", T.StringType(), True), 
    T.StructField("gender", T.StringType(), True), 
    T.StructField("salary", T.IntegerType(), True) 
  ])

dfData2 = spark.createDataFrame(data2, schema)
dfData2.printSchema()
dfData2.show()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    James|          |   Smith|36636|     M|  3000|
|  Michael|      Rose|        |40288|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

