In [None]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=b0dce1cf8d6eb25000097f6cff425b8a18a27caec4992ee5a7b5ec040291b25d
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
from pyspark.sql.types import Row
spark=SparkSession.builder.appName('CreateEmptyDataframe').master("local[*]").getOrCreate()
schema=StructType([
    StructField("column1",StringType(),True),
    StructField("column2",IntegerType(),True),
    StructField("column3",StringType(),True),
])
empty_rdd=spark.sparkContext.emptyRDD()
empty_df=spark.createDataFrame(empty_rdd,schema)
empty_df.printSchema()
empty_df.show()
empty_df.printSchema()



root
 |-- column1: string (nullable = true)
 |-- column2: integer (nullable = true)
 |-- column3: string (nullable = true)

+-------+-------+-------+
|column1|column2|column3|
+-------+-------+-------+
+-------+-------+-------+

root
 |-- column1: string (nullable = true)
 |-- column2: integer (nullable = true)
 |-- column3: string (nullable = true)



In [None]:
#Rename column

#rename single column
renamed_df = empty_df.withColumnRenamed("column1", "user_name")
renamed_df.show()

#rename multiple columns
renamed_df = empty_df.withColumnRenamed("column1", "user_name") \
                      .withColumnRenamed("column2", "user_age")

renamed_df.show()

#renaming all
df= empty_df.toDF("user_name", "user_age", "user_city")

df.show()


+---------+-------+-------+
|user_name|column2|column3|
+---------+-------+-------+
+---------+-------+-------+

+---------+--------+-------+
|user_name|user_age|column3|
+---------+--------+-------+
+---------+--------+-------+

+---------+--------+---------+
|user_name|user_age|user_city|
+---------+--------+---------+
+---------+--------+---------+



In [None]:
df=empty_df.toDF("user_name", "user_age", "user_city")
df.show()
from pyspark.sql.functions import lit
df_with_city = df.withColumn("city", lit("Mumbai"))
df_with_city.show()
df_with_city.printSchema()

+---------+--------+---------+
|user_name|user_age|user_city|
+---------+--------+---------+
+---------+--------+---------+

+---------+--------+---------+----+
|user_name|user_age|user_city|city|
+---------+--------+---------+----+
+---------+--------+---------+----+

root
 |-- user_name: string (nullable = true)
 |-- user_age: integer (nullable = true)
 |-- user_city: string (nullable = true)
 |-- city: string (nullable = false)



In [None]:
data = [("preet",20,"mulund"),("khusal",30,"thane"),("dinesh",20,"mumbai")]
df = spark.createDataFrame(data = data, schema = df.columns)
df.show()

from pyspark.sql.functions import col
df2 = df.select(col("user_name"),col("user_city"))
df2.show()


+---------+--------+---------+
|user_name|user_age|user_city|
+---------+--------+---------+
|    preet|      20|   mulund|
|   khusal|      30|    thane|
|   dinesh|      20|   mumbai|
+---------+--------+---------+

+---------+---------+
|user_name|user_city|
+---------+---------+
|    preet|   mulund|
|   khusal|    thane|
|   dinesh|   mumbai|
+---------+---------+



In [None]:
from pyspark.sql.functions import cast
df.printSchema()
df_casted = df.withColumn("user_age", col("user_age").cast("string"))
df_casted.printSchema()


root
 |-- user_name: string (nullable = true)
 |-- user_age: long (nullable = true)
 |-- user_city: string (nullable = true)

root
 |-- user_name: string (nullable = true)
 |-- user_age: string (nullable = true)
 |-- user_city: string (nullable = true)



In [None]:
from pyspark.sql.functions import col
df2 = df.select(col("user_name"),col("user_city"))
df2.show()


#convert column type
from pyspark.sql.functions import cast
df.printSchema()
df_casted = df.withColumn("user_age", col("user_age").cast("string"))
df_casted.printSchema()



+---------+---------+
|user_name|user_city|
+---------+---------+
|    preet|   mulund|
|   khusal|    thane|
|   dinesh|   mumbai|
+---------+---------+

root
 |-- user_name: string (nullable = true)
 |-- user_age: long (nullable = true)
 |-- user_city: string (nullable = true)

root
 |-- user_name: string (nullable = true)
 |-- user_age: string (nullable = true)
 |-- user_city: string (nullable = true)



In [None]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder .appName("PivotUnpivotExample") \
    .master("local[*]") .getOrCreate()

data = [("preet", "Sales", 5000,"Mumbai"),
        ("rao", "Marketing", 3000,"Pune"),
        ("ram", "Sales", 4000,"Mumbai"),
        ("ankit", "Marketing", 2000, "Goa"),
        ("aniket", "HR", 1000, "Goa")]

columns = ["Name", "Department", "Salary", "City"]


df = spark.createDataFrame(data, schema=columns)
df.show()
pivot_df = df.groupBy("City").pivot("Department").sum("Salary")
pivot_df.show()


+------+----------+------+------+
|  Name|Department|Salary|  City|
+------+----------+------+------+
| preet|     Sales|  5000|Mumbai|
|   rao| Marketing|  3000|  Pune|
|   ram|     Sales|  4000|Mumbai|
| ankit| Marketing|  2000|   Goa|
|aniket|        HR|  1000|   Goa|
+------+----------+------+------+

+------+----+---------+-----+
|  City|  HR|Marketing|Sales|
+------+----+---------+-----+
|Mumbai|NULL|     NULL| 9000|
|   Goa|1000|     2000| NULL|
|  Pune|NULL|     3000| NULL|
+------+----+---------+-----+

