<a href="https://colab.research.google.com/github/IsfaquethedataAnalyst/Data_Analyst/blob/main/SparkSession_Unified_entry_point_for_DataFrame_and_SQL_operations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("PySpark Basics").getOrCreate()

# Create a DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Show the DataFrame
df.show()

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+



In [5]:
# From a list of tuples
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])

# From a Pandas DataFrame
import pandas as pd
pandas_df = pd.DataFrame({"Name": ["Alice", "Bob", "Charlie"], "Age": [25, 30, 35]})
spark_df = spark.createDataFrame(pandas_df)

In [6]:
# Show the DataFrame
df.show()

# Print the schema
df.printSchema()

# Select specific columns
df.select("Name").show()

# Filter rows
df.filter(df.Age > 30).show()

# Add a new column
from pyspark.sql.functions import col
df = df.withColumn("AgeNextYear", col("Age") + 1)

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)

+-------+
|   Name|
+-------+
|  Alice|
|    Bob|
|Charlie|
+-------+

+-------+---+
|   Name|Age|
+-------+---+
|Charlie| 35|
+-------+---+



In [7]:
df.createOrReplaceTempView("people")

In [8]:
result = spark.sql("SELECT * FROM people WHERE Age > 30")
result.show()

+-------+---+-----------+
|   Name|Age|AgeNextYear|
+-------+---+-----------+
|Charlie| 35|         36|
+-------+---+-----------+



In [9]:
from pyspark.sql.functions import avg
avg_age = df.select(avg("Age")).collect()[0][0]
print(f"Average age: {avg_age}")

Average age: 30.0


In [11]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, dense_rank, rank

# Assuming you want to partition by 'Name' and order by 'Age' as a proxy
windowSpec = Window.partitionBy("Name").orderBy("Age")

df = df.withColumn("row_number", row_number().over(windowSpec)) \
       .withColumn("dense_rank", dense_rank().over(windowSpec)) \
       .withColumn("rank", rank().over(windowSpec))

df.show()

+-------+---+-----------+----------+----------+----+
|   Name|Age|AgeNextYear|row_number|dense_rank|rank|
+-------+---+-----------+----------+----------+----+
|  Alice| 25|         26|         1|         1|   1|
|    Bob| 30|         31|         1|         1|   1|
|Charlie| 35|         36|         1|         1|   1|
+-------+---+-----------+----------+----------+----+



In [21]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, dense_rank, rank

# Assuming 'Name' and 'Age' are the relevant columns for your ranking
windowSpec = Window.partitionBy("Name").orderBy("Age")

df = df.withColumn("row_number", row_number().over(windowSpec)) \
       .withColumn("dense_rank", dense_rank().over(windowSpec)) \
       .withColumn("rank", rank().over(windowSpec))

In [30]:
from pyspark.sql.functions import sum, avg, max, min, count

# Verify the column names in your DataFrame
print(df.columns)

# Choose the correct column name to group by.
# If there's no column to group by, remove the groupBy clause entirely.
result = df.groupBy("Name") \
           .agg(sum("Age").alias("total_age"),  # Replace 'Age' with the column you want to aggregate
                avg("Age").alias("avg_age"),
                max("Age").alias("max_age"), # Fixed indentation
                min("Age").alias("min_age"),
                count("*").alias("count"))

result.show()

['Name', 'Age', 'AgeNextYear', 'row_number', 'dense_rank', 'rank', 'age_category']
+-------+---------+-------+-------+-------+-----+
|   Name|total_age|avg_age|max_age|min_age|count|
+-------+---------+-------+-------+-------+-----+
|  Alice|       25|   25.0|     25|     25|    1|
|Charlie|       35|   35.0|     35|     35|    1|
|    Bob|       30|   30.0|     30|     30|    1|
+-------+---------+-------+-------+-------+-----+



In [31]:
employees = spark.createDataFrame([("Alice", "Sales"), ("Bob", "Engineering")], ["name", "department"])
departments = spark.createDataFrame([("Sales", "New York"), ("Engineering", "San Francisco")], ["department", "location"])

result = employees.join(departments, "department")

In [32]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def age_category(age):
    return 1 if age < 30 else 2 if age < 50 else 3

age_category_udf = udf(age_category, IntegerType())

df = df.withColumn("age_category", age_category_udf(df.Age))

In [33]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType

@pandas_udf(IntegerType())
def age_category(age: pd.Series) -> pd.Series:
    return pd.cut(age, bins=[0, 30, 50, 150], labels=[1, 2, 3], include_lowest=True)

df = df.withColumn("age_category", age_category(df.Age))