<a href="https://colab.research.google.com/github/Jayakumar1305/ETL-Workflow-with-PYTHON/blob/main/Day1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

In [None]:
df = SparkSession.builder.appName("ColabPySpark").getOrCreate()


In [None]:
data = [(1, "Alice", 25, 5000), (2, "Bob", 30, 6000), (3, "Charlie", 35, 7000)]
columns = ["ID", "Name", "Age", "Salary"]

df = df.createDataFrame(data, columns)


In [None]:
df.show()

+---+-------+---+------+
| ID|   Name|Age|Salary|
+---+-------+---+------+
|  1|  Alice| 25|  5000|
|  2|    Bob| 30|  6000|
|  3|Charlie| 35|  7000|
+---+-------+---+------+



In [None]:
df.printSchema()

root
 |-- ID: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Salary: long (nullable = true)



In [None]:
df.columns

['ID', 'Name', 'Age']

In [None]:
df.describe().show()

+-------+---+-------+----+
|summary| ID|   Name| Age|
+-------+---+-------+----+
|  count|  3|      3|   3|
|   mean|2.0|   NULL|30.0|
| stddev|1.0|   NULL| 5.0|
|    min|  1|  Alice|  25|
|    max|  3|Charlie|  35|
+-------+---+-------+----+



In [None]:
df = df.withColumn("Bonus", df["Salary"] * 0.1)

In [None]:
df.show()

+---+-------+---+------+-----+
| ID|   Name|Age|Salary|Bonus|
+---+-------+---+------+-----+
|  1|  Alice| 25|  5000|500.0|
|  2|    Bob| 30|  6000|600.0|
|  3|Charlie| 35|  7000|700.0|
+---+-------+---+------+-----+



In [None]:
from pyspark.sql.functions import col

df.select(col("salary")) # Use col instead of Col

DataFrame[salary: bigint]

How to Create a  explicitly Schema


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType #Import LongType

# Initialize Spark Session

In [None]:
spark = SparkSession.builder.appName("Basic").getOrCreate()

# Define Schema with BIGINT (LongType in PySpark)

In [None]:
schema = StructType([
    StructField("ID", LongType(), True),        # BIGINT equivalent
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Salary", LongType(), True)     # BIGINT equivalent
])

# Define Data
data = [(1, "Jai", 25, 5000),
        (2, "Kumar", 30, 6000),
        (3, "Sam", 35, 7000),
        (4, "Hari", 29, 5500)]

In [None]:
spark = spark.createDataFrame(data, schema)

In [None]:
spark.show()

+---+-----+---+------+
| ID| Name|Age|Salary|
+---+-----+---+------+
|  1|  Jai| 25|  5000|
|  2|Kumar| 30|  6000|
|  3|  Sam| 35|  7000|
|  4| Hari| 29|  5500|
+---+-----+---+------+



In [None]:
spark.printSchema()

root
 |-- ID: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Salary: long (nullable = true)



In [None]:
spark.columns

['ID', 'Name', 'Age', 'Salary']

In [None]:
spark.dtypes

[('ID', 'bigint'), ('Name', 'string'), ('Age', 'int'), ('Salary', 'bigint')]

In [None]:
spark = spark.withColumn("Bonus", spark["Salary"] * 0.2)

In [None]:
spark.show()

+---+-----+------+------+------+
| ID| Name|Age_no|Salary| Bonus|
+---+-----+------+------+------+
|  1|  Jai|    25|  5000|1000.0|
|  2|Kumar|    30|  6000|1200.0|
|  3|  Sam|    35|  7000|1400.0|
|  4| Hari|    29|  5500|1100.0|
+---+-----+------+------+------+



In [None]:

spark.select("Name","Age").show()

+-----+---+
| Name|Age|
+-----+---+
|  Jai| 25|
|Kumar| 30|
|  Sam| 35|
| Hari| 29|
+-----+---+



In [None]:
df.show()

+---+-------+---+------+-----+
| ID|   Name|Age|Salary|Bonus|
+---+-------+---+------+-----+
|  1|  Alice| 25|  5000|500.0|
|  2|    Bob| 30|  6000|600.0|
|  3|Charlie| 35|  7000|700.0|
+---+-------+---+------+-----+



In [None]:
spark.show()

+---+-----+---+------+------+
| ID| Name|Age|Salary| Bonus|
+---+-----+---+------+------+
|  1|  Jai| 25|  5000|1000.0|
|  2|Kumar| 30|  6000|1200.0|
|  3|  Sam| 35|  7000|1400.0|
|  4| Hari| 29|  5500|1100.0|
+---+-----+---+------+------+



In [None]:
spark = spark.withColumnRenamed("Age", "Age_no")

In [None]:
spark.show()

+---+-----+------+------+
| ID| Name|Age_no|Salary|
+---+-----+------+------+
|  1|  Jai|    25|  5000|
|  2|Kumar|    30|  6000|
|  3|  Sam|    35|  7000|
|  4| Hari|    29|  5500|
+---+-----+------+------+



In [None]:
spark.filter(spark.Age_no >= 30).show()

+---+-----+------+------+------+
| ID| Name|Age_no|Salary| Bonus|
+---+-----+------+------+------+
|  2|Kumar|    30|  6000|1200.0|
|  3|  Sam|    35|  7000|1400.0|
+---+-----+------+------+------+



In [None]:
spark.select(col("Name")).show()

+-----+
| Name|
+-----+
|  Jai|
|Kumar|
|  Sam|
| Hari|
+-----+



In [None]:
spark_session = SparkSession.builder.appName("Basic").getOrCreate()
new_data = [(5, "Alice", 34, 5500)]
new_df = spark_session.createDataFrame(new_data, schema=schema)


In [None]:
new_data = [(5, "Alice", 34, 5500)]
new_df = spark_session.createDataFrame(new_data, schema=schema)

In [None]:
new_df.show()

+---+-----+---+------+
| ID| Name|Age|Salary|
+---+-----+---+------+
|  5|Alice| 34|  5500|
+---+-----+---+------+



In [None]:
new_df = new_df.withColumn("Bonus", new_df["Salary"] * 0.2)
new_df = new_df.withColumnRenamed("Age", "Age_no")
new_df.show()

+---+-----+------+------+------+
| ID| Name|Age_no|Salary| Bonus|
+---+-----+------+------+------+
|  5|Alice|    34|  5500|1100.0|
+---+-----+------+------+------+



In [None]:
spark = spark.union(new_df)

In [None]:
spark.show()

+---+-----+------+------+------+
| ID| Name|Age_no|Salary| Bonus|
+---+-----+------+------+------+
|  1|  Jai|    25|  5000|1000.0|
|  2|Kumar|    30|  6000|1200.0|
|  3|  Sam|    35|  7000|1400.0|
|  4| Hari|    29|  5500|1100.0|
|  5|Alice|    34|  5500|1100.0|
+---+-----+------+------+------+



In [None]:
from pyspark.sql.functions import when, col # Import the 'when' function

spark = spark.withColumn(
    "City",
    when(col("Age_no") < 27, "New York")
    .when(col("Age_no") < 32, "Los Angeles")
    .when(col("Age_no") < 36, "Chicago")
    .otherwise("Houston")
)

In [None]:
spark.show()

+---+-----+------+------+------+-----------+
| ID| Name|Age_no|Salary| Bonus|       City|
+---+-----+------+------+------+-----------+
|  1|  Jai|    25|  5000|1000.0|   New York|
|  2|Kumar|    30|  6000|1200.0|Los Angeles|
|  3|  Sam|    35|  7000|1400.0|    Chicago|
|  4| Hari|    29|  5500|1100.0|Los Angeles|
|  5|Alice|    34|  5500|1100.0|    Chicago|
+---+-----+------+------+------+-----------+



In [None]:
spark.filter(spark.Age_no >= 30).show()

+---+-----+------+------+------+-----------+
| ID| Name|Age_no|Salary| Bonus|       City|
+---+-----+------+------+------+-----------+
|  2|Kumar|    30|  6000|1200.0|Los Angeles|
|  3|  Sam|    35|  7000|1400.0|    Chicago|
|  5|Alice|    34|  5500|1100.0|    Chicago|
+---+-----+------+------+------+-----------+



In [None]:
spark.filter(spark.City == "New York").show()

+---+----+------+------+------+--------+
| ID|Name|Age_no|Salary| Bonus|    City|
+---+----+------+------+------+--------+
|  1| Jai|    25|  5000|1000.0|New York|
+---+----+------+------+------+--------+



In [None]:
spark.where((spark.Age_no > 30) & (spark.City == "Chicago")).show()

AttributeError: 'NoneType' object has no attribute 'where'

In [None]:
from pyspark.sql.functions import sum,avg,count,min,max

Asc and Desc order

In [None]:
from pyspark.sql.functions import asc, desc # Import asc and desc, not sort


In [None]:
spark = spark.sort(col("Age_no").desc()).show() # Sort the 'spark' DataFrame, which has the 'Age_no' column # Sort and store the result

AttributeError: 'NoneType' object has no attribute 'sort'

In [None]:
spark.show()

AttributeError: 'NoneType' object has no attribute 'show'

In [None]:
spark = spark.sort(spark.Age.desc()).show()

AttributeError: 'NoneType' object has no attribute 'sort'

In [None]:
spark = spark.sort(col("Age").asc()).show()

+---+-----+---+------+
| ID| Name|Age|Salary|
+---+-----+---+------+
|  1|  Jai| 25|  5000|
|  4| Hari| 29|  5500|
|  2|Kumar| 30|  6000|
|  3|  Sam| 35|  7000|
+---+-----+---+------+



In [None]:
from pyspark.sql.functions import sum,avg,count,min,max
from pyspark.sql.functions import avg

In [None]:
spark.show()

+---+-----+------+------+------+-----------+
| ID| Name|Age_no|Salary| Bonus|       City|
+---+-----+------+------+------+-----------+
|  1|  Jai|    25|  5000|1000.0|   New York|
|  2|Kumar|    30|  6000|1200.0|Los Angeles|
|  3|  Sam|    35|  7000|1400.0|    Chicago|
|  4| Hari|    29|  5500|1100.0|Los Angeles|
|  5|Alice|    34|  5500|1100.0|    Chicago|
+---+-----+------+------+------+-----------+



In [None]:
spark.select(avg("Salary")).show()

+-----------+
|avg(Salary)|
+-----------+
|     5800.0|
+-----------+



In [None]:
spark.select(sum("Salary")).show()

+-----------+
|sum(Salary)|
+-----------+
|      29000|
+-----------+



In [None]:
spark.select(min("Age_no"),max("Age_no") ).show()

+-----------+-----------+
|min(Age_no)|max(Age_no)|
+-----------+-----------+
|         25|         35|
+-----------+-----------+



In [None]:
spark.groupBy("City").count().show()

+-----------+-----+
|       City|count|
+-----------+-----+
|Los Angeles|    2|
|   New York|    1|
|    Chicago|    2|
+-----------+-----+



In [None]:
spark.groupBy("City").agg(avg("Salary")).show()

+-----------+-----------+
|       City|avg(Salary)|
+-----------+-----------+
|Los Angeles|     5750.0|
|   New York|     5000.0|
|    Chicago|     6250.0|
+-----------+-----------+



In [None]:
spark.groupBy("City").sum("Salary").show()

+-----------+-----------+
|       City|sum(Salary)|
+-----------+-----------+
|Los Angeles|      11500|
|   New York|       5000|
|    Chicago|      12500|
+-----------+-----------+



In [None]:
spark.groupBy("City").agg(avg("Salary").alias("Avg_Salary")).show()

+-----------+----------+
|       City|Avg_Salary|
+-----------+----------+
|Los Angeles|    5750.0|
|   New York|    5000.0|
|    Chicago|    6250.0|
+-----------+----------+



Functions in pyspark
1.String functions
2.Numeric Functions
3.Date & Time Functions
4.Conditional Functions
5.Windows Functions
6. Agg functions

In [None]:
from pyspark.sql.functions import lower,upper,length,trim,regexp_replace

In [None]:
data = [("jay ",),("kumar ",),("mithu ",)]
function = spark_session.createDataFrame(data, ["name"])
function.select (
    col("name"),
    lower(col("name")).alias("Lowercase"),
    upper(col("name")).alias("Uppercase"),
    length(col("name")).alias("Length"),
    trim(col("name")).alias("Trimmed"),
    regexp_replace(col("name"), " ", "_").alias("Replaced")
    ).show()

+------+---------+---------+------+-------+--------+
|  name|Lowercase|Uppercase|Length|Trimmed|Replaced|
+------+---------+---------+------+-------+--------+
|  jay |     jay |     JAY |     4|    jay|    jay_|
|kumar |   kumar |   KUMAR |     6|  kumar|  kumar_|
|mithu |   mithu |   MITHU |     6|  mithu|  mithu_|
+------+---------+---------+------+-------+--------+



Date Function


In [None]:
from pyspark.sql.functions import current_date, current_timestamp, date_add, datediff,date_sub

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, current_timestamp, date_add, date_sub, datediff, lit

# Create Spark session
spark_session = SparkSession.builder.appName("DateFunctions").getOrCreate()

# Create a dummy DataFrame
date = spark_session.createDataFrame([(1,)], ["id"])

# Select date functions
date.select(
    current_date().alias("Current_Date"),
    current_timestamp().alias("Current_Timestamp"),
    date_add(current_date(), 10).alias("Date_Add"),   # Adds 10 days
    date_sub(current_date(), 5).alias("Date_Sub"),    # Subtracts 5 days
    datediff(current_date(), lit("2023-01-01")).alias("Date_Diff")  # Days difference
).show()


+------------+--------------------+----------+----------+---------+
|Current_Date|   Current_Timestamp|  Date_Add|  Date_Sub|Date_Diff|
+------------+--------------------+----------+----------+---------+
|  2025-02-13|2025-02-13 08:49:...|2025-02-23|2025-02-08|      774|
+------------+--------------------+----------+----------+---------+

