In [0]:
#1,.create a pyspark application
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark DataFrames").getOrCreate()


In [0]:
#2.import the necessary types as classes
from pyspark.sql.types import (StructType, StructField,IntegerType,StringType,ArrayType)
#3.Construct the schema
schema = StructType([StructField("id", IntegerType(), True),
                     StructField("name", StringType(), True),
                     StructField("scores", IntegerType(), True),
            ])
# Define the data as a list of tuples
data = [
    (1, "Alice", 85),
    (2, "Bob", 90),
    (3, "Charlie", 78)
]
#4.Set the schema
df = spark.createDataFrame(data,schema=schema)


In [0]:
df.show()
df.printSchema()

In [0]:
schema = StructType([StructField("age",IntegerType()),
 StructField("education_num",IntegerType()),
 StructField("marital_status",StringType()),
 StructField("occupation",StringType()),
 StructField("income",StringType()),
 ])

In [0]:
census_adult = spark.read.csv("dbfs:/path/to/your/file/adult_reduced_100.csv",sep=',', header=False,
schema=schema)
census_adult.show()
census_adult.printSchema()

In [0]:
#sort using  the age column
df.sort("age",ascending =False).show()
# Drop missing values
df.na.drop().show()
# fill missing values
df.na.fill(0).show()
df.na.fill("unknown").show()
# Replace missing values with a specific value
df.na.fill(0).show()




In [0]:
#drop rows with any nulls
df_cleaned = df.na.drop()
#filter out nulls
df_cleaned = df.where(col("age")).isNotNull())
#replace nulls with a specific value
df_cleaned = df.na.fill(0)
#fill nulls in the age column with the value 0
df_filled = df.na.fill({"age":0})
df_filled.show()


In [0]:
#creating a new column
df = df.withColumn("age_group", when(col("age") < 30, "young").otherwise("old"))
df.show()

In [0]:
#renaming columns
df = df.withColumnRenamed("age", "age_new")
df.show()
#dropping columns
df = df.drop("age")
df.show()
#selecting columns
df.select("age", "name").show()
#selecting rows
df.filter(col("age") > 30).show()
#selecting rows and columns
df.filter(col("age") > 30).select("age", "name").show()
#group by
df.groupBy("age").count().show()
#group by and aggregate
df.groupBy("age").agg(avg("age")).show()
#join
df.join(df2, df.id == df2.id, "inner").show()  
#save the dataframe as a csv file
df.write.csv("dbfs:/path/to/your/file/output.csv")

In [0]:
#filtering rows
df.filter(col("age") > 30).show()
#selecting columns
df.select("age", "name").show()
#grouping rows
df.groupBy("age").count().show()
#joining two dataframes
df.join(df2, df.id == df2.id, "inner").show

In [0]:
#grouping rows
df.groupBy("category").agg(avg("price")).show()
df_grouped = df.groupby("category").agg({"value_column": "avg"})
df_grouped.show()
df_grouped =df.groupby("category").agg({"value_column": "avg"})
df_grouped.show()
#filtering rows
df.filter(col("age") > 30).show()
'#selecting columns
df.select("age", "name").show()
#joining two dataframes
df.join(df2, df.id == df2.id, "inner").show


In [0]:
df_union = df1.union(df2)
df_union.show()
df = df.withColumn("age_group", when(col("age") < 30

In [0]:
#working with arrays and maps
from pyspark.sql.functions import explode, split, array_contains, map_keys, map_values, map_entries, map_from_arrays, array_sort, array_distinct, array_remove, array_union, array_intersect, array_except, array_zip, array    concat, array_repeat, array_position, array_sort

In [0]:
from pyspark.sql.functions import lit
df =df.withColumn("array_column",lit([1,2,3]))
df.show()

In [0]:
df = df.withColumn("map_column",lit({"key1":"value1","key2":"value2"}))
df.show()

In [0]:
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf("float")
def calculate_squares(x: pd.Series)") -> pd.Series:
    return x**2
df = df.withColumn("squares",calculate_squares(df["array_column"]))
df.show()
df = df.withColumn("squares",calculate_squares(df["array_column"]))
df.show()

In [0]:
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf("float")
def calculate_squares(x: pd.Series)") -> pd.Series:
    return x**2
df = df.withColumn("squares",calculate_squares(df["array_column"]))
df.show()
df = df.withColumn("squares",calculate_squares(df["array_column"]))
df.show()

In [0]:
def fahrenheit_to_celsius(fahrenheit):
    return (fahrenheit - 32) * 5 / 9
df = df.withColumn("celsius",fahrenheit_to_celsius(df["temperature"]))
df.show()