<a href="https://colab.research.google.com/github/Abhishekpamulapati/PortfolioProjects/blob/main/Pyspark/Basic_Introduction_to_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Read a csv file and set the headers
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark= SparkSession.builder.getOrCreate()

df = (spark.read
      .options(header= True)
      .csv("/home/repl/workspace/mnt/data_lake/landing/ratings.csv"))

df.show()

# Define the schema
schema = StructType([
  StructField("brand", StringType(), nullable=False),
  StructField("model", StringType(), nullable=False),
  StructField("absorption_rate", IntegerType(), nullable=True),
  StructField("comfort", IntegerType(), nullable=True)
])

better_df = (spark
             .read
             .options(header="true")
             # Pass the predefined schema to the Reader
             .schema(schema)
             .csv("/home/repl/workspace/mnt/data_lake/landing/ratings.csv"))
pprint(better_df.dtypes)

In [None]:
# Specify the option to drop invalid rows
ratings = (spark
           .read
           .options(header=True, mode="DROPMALFORMED")
           .csv("/home/repl/workspace/mnt/data_lake/landing/ratings_with_invalid_rows.csv"))
ratings.show()

print("BEFORE")
ratings.show()

print("AFTER")
# Replace nulls with arbitrary value on column subset
ratings = ratings.fillna(4, subset=["comfort"])
ratings.show()

In [None]:
##Conditionally Replacing the values

from pyspark.sql.functions import col, when

# Add/relabel the column
categorized_ratings = ratings.withColumn(
    "comfort",
    # Express the condition in terms of column operations
    when(col("comfort") > 3, "sufficient").otherwise("insufficient"))

categorized_ratings.show()

In [None]:
##Selecting and Renaming columns

from pyspark.sql.functions import col

# Select the columns and rename the "absorption_rate" column
result = ratings.select([col("brand"),
                       col("model"),
                       col("absorption_rate").alias("absorbency")])

# Show only unique values
result.distinct().show()



In [None]:
##Grouping and aggregating the data

from pyspark.sql.functions import col, avg, stddev_samp, max as sfmax

aggregated = (purchased
              # Group rows by 'Country'
              .groupBy(col('Country'))
              .agg(
                # Calculate the average salary per group and rename
                avg('Salary').alias('average_salary'),
                # Calculate the standard deviation per group
                stddev_samp('Salary'),
                # Retain the highest salary per group and rename
              sfmax('Salary').alias('highest_salary')
              )
             )

aggregated.show()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

from pydiaper.data_catalog.catalog import catalog


def main():
    spark = SparkSession.builder.getOrCreate()
    schema = StructType([
        StructField("store", StringType(), False),
        StructField("countrycode", StringType(), False),
        StructField("brand", StringType(), False),
        StructField("model", StringType(), True),
        StructField("price", FloatType(), False),
        StructField("currency", StringType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("date", DateType(), False),

    ])
    frame = (spark.read
             .options(header="true")
             .schema(schema)
             .csv(catalog["landing/prices"]))

    # No more cleaning actions required.

    (frame
     .repartition(2)  # force 2 partitions for DataCamp, MC question
     .write
     .parquet(catalog["clean/prices"], mode="overwrite"))


if __name__ == "__main__":
    main()


Testing the pipelines in python

In [None]:
##Creating in-memory pipelines fro reuability and testing dataframe pipelines
from datetime import date
from pyspark.sql import Row

Record = Row("country", "utm_campaign", "airtime_in_minutes", "start_date", "end_date")

# Create a tuple of records
data = (
  Record("USA", "DiapersFirst", 28, date(2017, 1, 20), date(2017, 1, 27)),
  Record("Germany", "WindelKind", 31, date(2017, 1, 25), None),
  Record("India", "CloseToCloth", 32, date(2017, 1, 25), date(2017, 2, 2))
)

# Create a DataFrame from these records
frame = spark.createDataFrame(data)
frame.show()