#### Pyspark Environmental Setup

In [0]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

#### Create sparkSession

In [0]:
#Import SparkSession
from pyspark.sql import SparkSession
#Create a Spark Session
spark = SparkSession.builder \
        .appName("Sparksession") \
        .master("local") \
        .getOrCreate()

In [0]:
spark

#### Creating DataFrame

In [0]:
# Creating an Empty RDD
rdd=spark.sparkContext.emptyRDD()
print(rdd)

EmptyRDD[0] at emptyRDD at NativeMethodAccessorImpl.java:0


In [0]:
from pyspark.sql.types import StructType,StructField,StringType
schema=StructType([
    StructField('First Name',StringType(),True),
    StructField('Middle Name',StringType(),True),
    StructField('Last Name',StringType(),True)
])
df=spark.createDataFrame(rdd,schema)

In [0]:
df.printSchema()

root
 |-- First Name: string (nullable = true)
 |-- Middle Name: string (nullable = true)
 |-- Last Name: string (nullable = true)



#### Converting an existing RDD to Dataframe

In [0]:
df1=rdd.toDF(schema)
print(df1)

DataFrame[First Name: string, Middle Name: string, Last Name: string]


#### Creating an empty DataFrame

In [0]:
df2=spark.createDataFrame([],schema)
df2.printSchema()

root
 |-- First Name: string (nullable = true)
 |-- Middle Name: string (nullable = true)
 |-- Last Name: string (nullable = true)



#### Creating Empty DataFrame without any schema

In [0]:
df3=spark.createDataFrame([],StructType([]))
print(df3)

DataFrame[]


### Converting pyspark Rdd to Dataframe

#### 1.Creating Pyspaark RDD

In [0]:
dept=[("Finanace",300),("Marketing",450),("Promotions",250)]
rdd=spark.sparkContext.parallelize(dept)

#### 2.Convert pyspark Rdd to DataFrame

##### a.Using toDF()

In [0]:
df=rdd.toDF()
df.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



In [0]:
# With desired column names
columns=["Sector","Id"]
df=rdd.toDF(columns)
df.printSchema()
df.show()

root
 |-- Sector: string (nullable = true)
 |-- Id: long (nullable = true)

+----------+---+
|    Sector| Id|
+----------+---+
|  Finanace|300|
| Marketing|450|
|Promotions|250|
+----------+---+



##### b.Using createDataFrame()

In [0]:
df1=spark.createDataFrame(rdd,schema=columns)
df1.show()

+----------+---+
|    Sector| Id|
+----------+---+
|  Finanace|300|
| Marketing|450|
|Promotions|250|
+----------+---+



#### Converting DataFrame into Pandas

In [0]:
df_pandas=df.toPandas()
print(df_pandas)

       Sector   Id
0    Finanace  300
1   Marketing  450
2  Promotions  250


#### Show()

In [0]:
columns = ["Seqno","Quote"]
data = [("1", "Be the change that you wish to see in the world"),
    ("2", "Everyone thinks of changing the world, but no one thinks of changing himself."),
    ("3", "The purpose of our lives is to be happy."),
    ("4", "Be cool.")]
df = spark.createDataFrame(data,columns)

In [0]:
# display the contents upto 20 characters in the values of the columns
df.show()

+-----+--------------------+
|Seqno|               Quote|
+-----+--------------------+
|    1|Be the change tha...|
|    2|Everyone thinks o...|
|    3|The purpose of ou...|
|    4|            Be cool.|
+-----+--------------------+



In [0]:
# If you pass the value as a parameter to the show(), That number of rows will be displayed
df.show(3)   # here n = 3, stands for the number of rows to be displayed.

+-----+--------------------+
|Seqno|               Quote|
+-----+--------------------+
|    1|Be the change tha...|
|    2|Everyone thinks o...|
|    3|The purpose of ou...|
+-----+--------------------+
only showing top 3 rows



In [0]:
# When the truncate value is set to any value(viz, 40). Only 40 characters is displayed from the characters of the column value
df.show(truncate = 40)

+-----+----------------------------------------+
|Seqno|                                   Quote|
+-----+----------------------------------------+
|    1|Be the change that you wish to see in...|
|    2|Everyone thinks of changing the world...|
|    3|The purpose of our lives is to be happy.|
|    4|                                Be cool.|
+-----+----------------------------------------+



In [0]:
# When the truncate value is set to False, The Characters in the columns will be completely displayed
df.show(3, truncate = False)

+-----+-----------------------------------------------------------------------------+
|Seqno|Quote                                                                        |
+-----+-----------------------------------------------------------------------------+
|1    |Be the change that you wish to see in the world                              |
|2    |Everyone thinks of changing the world, but no one thinks of changing himself.|
|3    |The purpose of our lives is to be happy.                                     |
+-----+-----------------------------------------------------------------------------+
only showing top 3 rows



In [0]:
# If you want to display the details vertically, we use the vertical = true
df.show(n = 3, truncate = False, vertical = True)

-RECORD 0------------------------------------------------------------------------------
 Seqno | 1                                                                             
 Quote | Be the change that you wish to see in the world                               
-RECORD 1------------------------------------------------------------------------------
 Seqno | 2                                                                             
 Quote | Everyone thinks of changing the world, but no one thinks of changing himself. 
-RECORD 2------------------------------------------------------------------------------
 Seqno | 3                                                                             
 Quote | The purpose of our lives is to be happy.                                      
only showing top 3 rows



#### StructType and StructField

##### Define columns with schema

In [0]:
# Import the necessary libraries
from pyspark.sql.types import StructType, StringType, StructField, IntegerType, FloatType

# Create schema
schema = StructType([
    StructField('Student Id', IntegerType(), True),
    StructField('First Name', StringType(), True),
    StructField('Last Name', StringType(), True),
    StructField('Attendance', FloatType(), True)
])

# Create Data to insert
data = [
    (1, "Satya", "Komati", 89.07),
    (2, "Adhi", "", 73.89),
    (3, "Pavi", "Gorrela", 92.676),
    (4, "Vinod", "Gorrela", 87.273),
    (5, "Vishal", "", 84.30)
]

In [0]:
# Creating dataframe
df = spark.createDataFrame(data, schema)

# Viewing the Schema of the Data frame
df.printSchema()

root
 |-- Student Id: integer (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Attendance: float (nullable = true)



In [0]:
# Showing the data from the database
df.show()

+----------+----------+---------+----------+
|Student Id|First Name|Last Name|Attendance|
+----------+----------+---------+----------+
|         1|     Satya|   Komati|     89.07|
|         2|      Adhi|         |     73.89|
|         3|      Pavi|  Gorrela|    92.676|
|         4|     Vinod|  Gorrela|    87.273|
|         5|    Vishal|         |      84.3|
+----------+----------+---------+----------+



#### Nesting the Schema

In [0]:
# Nesting the data
structure_data = [
    (1, ("Dhilli", ""), 23.40),
    (2, ("Rolex", "Watson"), 85.34),
    (3, ("Amar",""), 12.43),
    (4, ("Leo", "Das"), 98.23),
    (5, ("Vikram", "Iyer"), 2.09)
]

#Nested Schema Structure
structured_schema=StructType([
    StructField("Criminal_Id",IntegerType(),True),
    StructField("name",StructType([
        StructField("first_name",StringType(),True),
        StructField("last_name",StringType(),True)
    ])),
    StructField("Criminal_Percent",FloatType(),True)
])


In [0]:
lcu_df=spark.createDataFrame(structure_data,structured_schema)

In [0]:
lcu_df.show(truncate=False)

+-----------+---------------+----------------+
|Criminal_Id|name           |Criminal_Percent|
+-----------+---------------+----------------+
|1          |{Dhilli, }     |23.4            |
|2          |{Rolex, Watson}|85.34           |
|3          |{Amar, }       |12.43           |
|4          |{Leo, Das}     |98.23           |
|5          |{Vikram, Iyer} |2.09            |
+-----------+---------------+----------------+



#### Updating the structure of the dataframe

In [0]:
from pyspark.sql.functions import col,when,struct

#Updating the schema
updated_df=df.withColumn("Other info",
        struct(col("Student Id").alias("id"),
               col("First Name").alias("first_name"),
               col("Last Name").alias("last_name"),
               col("Attendance").alias("percent"),
               when(col("Attendance").cast(IntegerType())<75,"Not Eligible").otherwise("Eligible").alias("Eligibility")
         ))
updated_df.printSchema()

root
 |-- Student Id: integer (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Attendance: float (nullable = true)
 |-- Other info: struct (nullable = false)
 |    |-- id: integer (nullable = true)
 |    |-- first_name: string (nullable = true)
 |    |-- last_name: string (nullable = true)
 |    |-- percent: float (nullable = true)
 |    |-- Eligibility: string (nullable = false)



In [0]:
updated_df.show(truncate=False)

+----------+----------+---------+----------+-------------------------------------+
|Student Id|First Name|Last Name|Attendance|Other info                           |
+----------+----------+---------+----------+-------------------------------------+
|1         |Satya     |Komati   |89.07     |{1, Satya, Komati, 89.07, Eligible}  |
|2         |Adhi      |         |73.89     |{2, Adhi, , 73.89, Not Eligible}     |
|3         |Pavi      |Gorrela  |92.676    |{3, Pavi, Gorrela, 92.676, Eligible} |
|4         |Vinod     |Gorrela  |87.273    |{4, Vinod, Gorrela, 87.273, Eligible}|
|5         |Vishal    |         |84.3      |{5, Vishal, , 84.3, Eligible}        |
+----------+----------+---------+----------+-------------------------------------+



#### Adding the new columns to the DataFrame

In [0]:
updated_df_with_eligibility=df.withColumn("Eligibility",when(col("Attendance").cast(IntegerType())<75,"No").otherwise("Yes"))

In [0]:
updated_df_with_eligibility.printSchema()

root
 |-- Student Id: integer (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Attendance: float (nullable = true)
 |-- Eligibility: string (nullable = false)



In [0]:
updated_df_with_eligibility.show(truncate=False)

+----------+----------+---------+----------+-----------+
|Student Id|First Name|Last Name|Attendance|Eligibility|
+----------+----------+---------+----------+-----------+
|1         |Satya     |Komati   |89.07     |Yes        |
|2         |Adhi      |         |73.89     |No         |
|3         |Pavi      |Gorrela  |92.676    |Yes        |
|4         |Vinod     |Gorrela  |87.273    |Yes        |
|5         |Vishal    |         |84.3      |Yes        |
+----------+----------+---------+----------+-----------+



#### Using Sql Array and Map Types

In [0]:
from pyspark.sql.types import ArrayType,MapType
arrayAndMapSchema=StructType([
    StructField("First Name",StringType(),True),
    StructField("Last Name", StringType(), True),
    StructField("Attendance", FloatType(), True),
    StructField("Hobbies", ArrayType(StringType()), True),
    StructField("Properties", MapType(StringType(), StringType()), True)
])

In [0]:
# If there are more number of columns,we can use the schema.json() method to print the Schema in json format.
print(df.schema.json())

{"fields":[{"metadata":{},"name":"Student Id","nullable":true,"type":"integer"},{"metadata":{},"name":"First Name","nullable":true,"type":"string"},{"metadata":{},"name":"Last Name","nullable":true,"type":"string"},{"metadata":{},"name":"Attendance","nullable":true,"type":"float"}],"type":"struct"}


In [0]:
# To print it in the simplest format, we can use the simpleString() method
print(df.schema.simpleString())

struct<Student Id:int,First Name:string,Last Name:string,Attendance:float>


#### Renaming Columns

In [0]:
df=df.withColumnRenamed("First Name","first_name")\
    .withColumnRenamed("Last Name","last_name")\
        .withColumnRenamed("Student Id","student_id")

In [0]:
df.printSchema()

root
 |-- student_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- Attendance: float (nullable = true)



In [0]:
dataDF = [(('James','','Smith'),'1991-04-01','M',3000),
  (('Michael','Rose',''),'2000-05-19','M',4000),
  (('Robert','','Williams'),'1978-09-05','M',4000),
  (('Maria','Anne','Jones'),'1967-12-01','F',4000),
  (('Jen','Mary','Brown'),'1980-02-17','F',-1)
]

schema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('dob', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

data_df = spark.createDataFrame(data = dataDF, schema = schema)
data_df.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [0]:
newColumns=["newCol1","newCol2","newCol3","newCol4"]
data_df.toDF(*newColumns).printSchema()

root
 |-- newCol1: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- newCol2: string (nullable = true)
 |-- newCol3: string (nullable = true)
 |-- newCol4: integer (nullable = true)



In [0]:
data_df.columns

Out[43]: ['name', 'dob', 'gender', 'salary']

#### Column Objects

In [0]:
from pyspark.sql.functions import lit
colObj=lit("sparkbyexamples.com")

In [0]:
lcu_df.printSchema()

root
 |-- Criminal_Id: integer (nullable = true)
 |-- name: struct (nullable = true)
 |    |-- first_name: string (nullable = true)
 |    |-- last_name: string (nullable = true)
 |-- Criminal_Percent: float (nullable = true)



In [0]:
# Using . operator
df.select(df.Attendance).show()

+----------+
|Attendance|
+----------+
|     89.07|
|     73.89|
|    92.676|
|    87.273|
|      84.3|
+----------+



In [0]:
# Using df["Column Name"]
df.select(df["first_name"], df["last_name"]).show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|     Satya|   Komati|
|      Adhi|         |
|      Pavi|  Gorrela|
|     Vinod|  Gorrela|
|    Vishal|         |
+----------+---------+



In [0]:
# Using col function
from pyspark.sql.functions import col
df.select(col("student_id"),col("first_name")).show()

+----------+----------+
|student_id|first_name|
+----------+----------+
|         1|     Satya|
|         2|      Adhi|
|         3|      Pavi|
|         4|     Vinod|
|         5|    Vishal|
+----------+----------+



In [0]:
lcu_df.select(lcu_df["name.first_name"]).show()

+----------+
|first_name|
+----------+
|    Dhilli|
|     Rolex|
|      Amar|
|       Leo|
|    Vikram|
+----------+



#### Creating a DataFrame using Row Function

In [0]:
from pyspark.sql import Row
data=[Row(name="James",prop=Row(hair="black",eye="brown")),
      Row(name="Rahul",prop=Row(hair="blue",eye="reddish"))]
df_drop=spark.createDataFrame(data)

In [0]:
df_drop.select(col("prop.*")).show(1)

+-----+-----+
| hair|  eye|
+-----+-----+
|black|brown|
+-----+-----+
only showing top 1 row



#### Arithmetic operations

In [0]:
data=[(150,23,8),(180,43,5),(129,85,1)]
ndf=spark.createDataFrame(data).toDF("col1","col2","col3")

In [0]:
ndf.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
| 150|  23|   8|
| 180|  43|   5|
| 129|  85|   1|
+----+----+----+



In [0]:
ndf.select((ndf["col1"] + ndf["col2"]).alias("Sum")).show()
ndf.select(ndf["col1"] - ndf["col2"]).show()
ndf.select(ndf["col1"] * ndf["col3"]).show()
ndf.select(ndf["col1"] / ndf["col3"]).show()
ndf.select(ndf["col1"] % ndf["col3"]).show()
print("----------------------------------------------------------------------")
ndf.select(ndf["col1"] < ndf["col3"]).show()
ndf.select(ndf["col1"] > ndf["col3"]).show()
ndf.select(ndf["col1"] <= ndf["col3"]).show()
ndf.select(ndf["col1"] >= ndf["col3"]).show()
ndf.select(ndf["col1"] == ndf["col3"]).show()

+---+
|Sum|
+---+
|173|
|223|
|214|
+---+

+-------------+
|(col1 - col2)|
+-------------+
|          127|
|          137|
|           44|
+-------------+

+-------------+
|(col1 * col3)|
+-------------+
|         1200|
|          900|
|          129|
+-------------+

+-------------+
|(col1 / col3)|
+-------------+
|        18.75|
|         36.0|
|        129.0|
+-------------+

+-------------+
|(col1 % col3)|
+-------------+
|            6|
|            0|
|            0|
+-------------+

----------------------------------------------------------------------
+-------------+
|(col1 < col3)|
+-------------+
|        false|
|        false|
|        false|
+-------------+

+-------------+
|(col1 > col3)|
+-------------+
|         true|
|         true|
|         true|
+-------------+

+--------------+
|(col1 <= col3)|
+--------------+
|         false|
|         false|
|         false|
+--------------+

+--------------+
|(col1 >= col3)|
+--------------+
|          true|
|          true|
|  

#### Pyspark column functions

In [0]:
# substr (starting Position, length of the substring you wish to return) 
df.select(col("last_name").substr(1,2)).show()

# # starts with checks whether the item starts with a specific character or not
df.select(col("last_name").startswith('K')).show()

# # ends with checks whether the item starts ends with a specific character or not
df.select(col("last_name").endswith('a')).show()

+--------------------------+
|substring(last_name, 1, 2)|
+--------------------------+
|                        Ko|
|                          |
|                        Go|
|                        Go|
|                          |
+--------------------------+

+------------------------+
|startswith(last_name, K)|
+------------------------+
|                    true|
|                   false|
|                   false|
|                   false|
|                   false|
+------------------------+

+----------------------+
|endswith(last_name, a)|
+----------------------+
|                 false|
|                 false|
|                  true|
|                  true|
|                 false|
+----------------------+

