<a href="https://colab.research.google.com/github/birusolankar/Pyspark-Bigdata/blob/main/PySpark_16_11_2024.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pyspark

In [2]:
from pyspark.sql import functions as f
from pyspark.sql import SparkSession

from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder.appName('sample').getOrCreate()

In [4]:
# help(spark.createDataFrame)

In [5]:
# creating a dataframe from the scratch

data = [(1, 'Biru', 'data analyst'),
        (2, 'Laxmi', 'Revenue officer'),
        (3, 'Anuj', 'Data Engineer'),
        (4, 'Shubham', 'Cloud Engineer'),
        (5, 'Manali', 'Security analsy')]

df = spark.createDataFrame(data)

In [6]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [7]:
df.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)



In [8]:
df.show()

+---+-------+---------------+
| _1|     _2|             _3|
+---+-------+---------------+
|  1|   Biru|   data analyst|
|  2|  Laxmi|Revenue officer|
|  3|   Anuj|  Data Engineer|
|  4|Shubham| Cloud Engineer|
|  5| Manali|Security analsy|
+---+-------+---------------+



In [9]:
df = spark.createDataFrame(data = data, schema = ['id', 'name', 'role'])

In [10]:
df.show()

+---+-------+---------------+
| id|   name|           role|
+---+-------+---------------+
|  1|   Biru|   data analyst|
|  2|  Laxmi|Revenue officer|
|  3|   Anuj|  Data Engineer|
|  4|Shubham| Cloud Engineer|
|  5| Manali|Security analsy|
+---+-------+---------------+



In [11]:

data = [(1, 'Biru', 'data analyst'),
        (2, 'Laxmi', 'Revenue officer'),
        (3, 'Anuj', 'Data Engineer'),
        (4, 'Shubham', 'Cloud Engineer'),
        (5, 'Manali', 'Security analsy')]

schema = StructType([
    StructField(name = 'id', dataType = IntegerType()),
    StructField(name = 'name', dataType =  StringType()),
    StructField(name = 'role', dataType = StringType())
])

df = spark.createDataFrame(data = data, schema = schema)

In [12]:
df.show()

+---+-------+---------------+
| id|   name|           role|
+---+-------+---------------+
|  1|   Biru|   data analyst|
|  2|  Laxmi|Revenue officer|
|  3|   Anuj|  Data Engineer|
|  4|Shubham| Cloud Engineer|
|  5| Manali|Security analsy|
+---+-------+---------------+



In [13]:
from pyspark.sql.functions import split
from pyspark.sql.functions import regexp_replace

In [14]:
df.withColumn('new_role', regexp_replace(col('role'), ' ', '')).show()

+---+-------+---------------+--------------+
| id|   name|           role|      new_role|
+---+-------+---------------+--------------+
|  1|   Biru|   data analyst|   dataanalyst|
|  2|  Laxmi|Revenue officer|Revenueofficer|
|  3|   Anuj|  Data Engineer|  DataEngineer|
|  4|Shubham| Cloud Engineer| CloudEngineer|
|  5| Manali|Security analsy|Securityanalsy|
+---+-------+---------------+--------------+



In [15]:
df.withColumn('new_role', split(col('role'), ' ')).show(truncate = False)

+---+-------+---------------+------------------+
|id |name   |role           |new_role          |
+---+-------+---------------+------------------+
|1  |Biru   |data analyst   |[data, analyst]   |
|2  |Laxmi  |Revenue officer|[Revenue, officer]|
|3  |Anuj   |Data Engineer  |[Data, Engineer]  |
|4  |Shubham|Cloud Engineer |[Cloud, Engineer] |
|5  |Manali |Security analsy|[Security, analsy]|
+---+-------+---------------+------------------+



In [16]:
for row in df.select(col('name')).collect():
  print(row['name'].lower())

biru
laxmi
anuj
shubham
manali


In [17]:
df.select(col('name')).collect()

[Row(name='Biru'),
 Row(name='Laxmi'),
 Row(name='Anuj'),
 Row(name='Shubham'),
 Row(name='Manali')]

In [18]:
df.collect()

[Row(id=1, name='Biru', role='data analyst'),
 Row(id=2, name='Laxmi', role='Revenue officer'),
 Row(id=3, name='Anuj', role='Data Engineer'),
 Row(id=4, name='Shubham', role='Cloud Engineer'),
 Row(id=5, name='Manali', role='Security analsy')]

In [19]:
name = 'biru'
name.upper()

'BIRU'

In [20]:
data = [(10, 'cloth', 10000),
        (11, 'toy', 30000),
	      (12, 'toy', 2453),
	      (13, 'books', 5000),
	      (14, 'books', 2354),
	      (15, 'cloth', 2341),
        (16, 'cloth', 1000),
        (17, 'toy', 3000),
	      (18, 'toy', 2753),
        (18, 'books', 5400),
        (19, 'books', 2054),
        (20, 'cloth', 5541)]

schema = ['customerid', 'product_category', 'order_amount']

df = spark.createDataFrame(data = data, schema = schema)

In [21]:
# top customer with order amount for each category
df.withColumn('amount_rank', f.dense_rank().over(Window.partitionBy('product_category').orderBy(col('order_amount').desc())))\
.filter(col('amount_rank') <=2)\
.show()

+----------+----------------+------------+-----------+
|customerid|product_category|order_amount|amount_rank|
+----------+----------------+------------+-----------+
|        18|           books|        5400|          1|
|        13|           books|        5000|          2|
|        10|           cloth|       10000|          1|
|        20|           cloth|        5541|          2|
|        11|             toy|       30000|          1|
|        17|             toy|        3000|          2|
+----------+----------------+------------+-----------+



In [22]:
from pyspark.sql.functions import dense_rank, rank, row_number

In [23]:
df.withColumn('amount_rank', dense_rank().over(Window.partitionBy('product_category').orderBy(col('order_amount').desc())))\
.filter(col('amount_rank') <=2)\
.show()

+----------+----------------+------------+-----------+
|customerid|product_category|order_amount|amount_rank|
+----------+----------------+------------+-----------+
|        18|           books|        5400|          1|
|        13|           books|        5000|          2|
|        10|           cloth|       10000|          1|
|        20|           cloth|        5541|          2|
|        11|             toy|       30000|          1|
|        17|             toy|        3000|          2|
+----------+----------------+------------+-----------+



In [25]:
df.withColumn("row_number", row_number().over(Window.partitionBy(col('product_category')).orderBy(col('order_amount').desc())))\
.filter(col('row_number') <=2).show()

+----------+----------------+------------+----------+
|customerid|product_category|order_amount|row_number|
+----------+----------------+------------+----------+
|        18|           books|        5400|         1|
|        13|           books|        5000|         2|
|        10|           cloth|       10000|         1|
|        20|           cloth|        5541|         2|
|        11|             toy|       30000|         1|
|        17|             toy|        3000|         2|
+----------+----------------+------------+----------+



In [29]:
# check whether column contains alphnumeric values
df.select(col('product_category').cast('int').isNotNull()).show()

+-------------------------------------------+
|(CAST(product_category AS INT) IS NOT NULL)|
+-------------------------------------------+
|                                      false|
|                                      false|
|                                      false|
|                                      false|
|                                      false|
|                                      false|
|                                      false|
|                                      false|
|                                      false|
|                                      false|
|                                      false|
|                                      false|
+-------------------------------------------+

