# PYSPARK TOOLKIT

## Import from PySpark

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType  

## Initialize Spark Session

In [None]:
# Create a Spark session
spark = SparkSession.builder.appName("PySparkSQL").getOrCreate()

# Spark: Check Session and Print Details
print(type(spark))
print(spark.version)
print(spark.sparkContext.appName)

<class 'pyspark.sql.session.SparkSession'>
3.5.3
PySparkSQL


# Load Data into DataFrame

In [None]:
""" pyspark.sql.DataFrameReader

> Official documentation:
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.html
"""

# Load data into a DataFrame from a CSV file
df_csv = spark.read.csv(r"C:\Users\marco.SENNA-LAMOLHA\OneDrive\Documentos\PORTFOLIO\PYTHON\MRKTO_python\PYSPARK\DATASETS\data_csv.csv", header=True, inferSchema=True)
df_csv = spark.read.format("csv").load(r"C:\Users\marco.SENNA-LAMOLHA\OneDrive\Documentos\PORTFOLIO\PYTHON\MRKTO_python\PYSPARK\DATASETS\data_csv.csv", header=True, inferSchema=True)  # Alternative way


# Load data into a DataFrame from different data formats
df_json = spark.read.json()
df_parquet = spark.read.parquet()



# Show the DataFrame
df.show()
df.printSchema()

# Basic DataFrame Operations
Perform basic operations on DataFrames such as selecting columns, filtering rows, and aggregating data.

In [None]:
# Select specific columns
df.select("column1", "column2").show()

# Filter rows based on a condition
df.filter(df["column1"] > 100).show()

# Group by a column and aggregate data
df.groupBy("column3").count().show()

# Perform a SQL query on the DataFrame
df.createOrReplaceTempView("table")
spark.sql("SELECT column1, COUNT(*) FROM table GROUP BY column1").show()

In PySpark, the 

parallelize

 method is used to create an RDD (Resilient Distributed Dataset) from a Python collection, such as a list or a set. RDDs are the fundamental data structure of Apache Spark, representing a distributed collection of objects that can be processed in parallel across a cluster of machines. The 

parallelize

 method is particularly useful when you want to distribute a local collection of data across the Spark cluster to leverage the parallel processing capabilities of Spark.

Here's a simple example to illustrate the use of 

parallelize

:



In [None]:
#Creating a DataFrame from an RDD
rdd = spark.sparkContext.parallelize([("Alice", 1), ("Bob", 2), ("Cathy", 3)])
df = spark.createDataFrame(rdd, ["Name", "ID"])
df.show()

In [None]:
# Create a list of numbers
data = [1, 2, 3, 4, 5]

# Parallelize the list to create an RDD
rdd = spark.sparkContext.parallelize(data)

# Perform an action on the RDD (e.g., collect the elements)
print(rdd.collect())

In [None]:
df.withColumn('Company name',df['Company name'].cast('string')).show()
df.printSchema()

# SQL Queries on DataFrame
Use SQL queries to interact with DataFrames by registering them as temporary views.

In [None]:
# Register the DataFrame as a temporary view
df.createOrReplaceTempView("table")

# Perform a SQL query to select specific columns
result = spark.sql("SELECT column1, column2 FROM table")
result.show()

# Perform a SQL query to filter rows based on a condition
result = spark.sql("SELECT * FROM table WHERE column1 > 100")
result.show()

# Perform a SQL query to group by a column and aggregate data
result = spark.sql("SELECT column3, COUNT(*) as count FROM table GROUP BY column3")
result.show()

# DataFrame Transformations
Apply transformations to DataFrames such as map, filter, and groupBy.

In [None]:
# DataFrame Transformations

# Apply a map transformation to a DataFrame
mapped_df = df.rdd.map(lambda row: (row.column1, row.column2 * 2)).toDF(["column1", "column2_transformed"])
mapped_df.show()

# Apply a filter transformation to a DataFrame
filtered_df = df.filter(df["column1"] > 100)
filtered_df.show()

# Apply a groupBy transformation to a DataFrame and aggregate data
grouped_df = df.groupBy("column3").count()
grouped_df.show()