# PySpark in Google Colab

Installing PySpark is not as straightforward as the usual process in Python. It involves more than just running a pip install. First, you need to install dependencies such as **Java 17** and **Apache Spark 3.5.1**.

In [None]:
# Install dependencies
!apt-get install openjdk-17-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

The next step is to set up the environment variables. This ensures that the Colab environment can correctly locate and use the installed dependencies.

In [None]:
# Configure environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

# Make PySpark importable
import findspark
findspark.init('/content/spark-3.5.1-bin-hadoop3')

With everything set up, let's run a local session to test if the installation worked correctly.

In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Introduction").getOrCreate()

# Test the Spark session
df = spark.createDataFrame([(1, 'foo'), (2, 'bar')], ['id', 'label'])
df.show()

## **1 Using Spark with Python**

The first step in using Spark is to connect to a cluster.

In a practical scenario, the cluster will be hosted on a remote machine connected to all other nodes. This setup includes a primary machine known as the master, responsible for distributing data and tasks across the cluster.

### **1.1 Creating a SparkSession**

Creating multiple `SparkSessions` and `SparkContexts` can lead to issues, so it is a best practice to use the `SparkSession.builder.getOrCreate()` method. This method returns an existing `SparkSession` if it exists, otherwise, it creates a new one.

In [None]:
# Start a local session
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Introduction").getOrCreate()

In [None]:
# Verify SparkContext
print(spark)

# Print Spark version
print(spark.version)

### **1.2 Using DataFrames**

The main data structure in Spark is the Resilient Distributed Dataset (RDD). This is a low-level object that enables Spark to perform its magic by distributing data across multiple nodes in the cluster.

The Spark DataFrame is designed to behave similarly to a SQL table (with variables in columns and observations in rows). Not only are DataFrames easier to understand, but they are also more optimized for performance.

When you start modifying and combining columns and rows of data, there are many ways to achieve the same result, but some methods are significantly more time-consuming than others. With RDDs, it's important to be aware of the performance implications.

To begin working with Spark DataFrames, you first need to create a `SparkSession` object from your `SparkContext`. You can think of the `SparkContext` as your connection to the cluster and the `SparkSession` as your interface for interacting with Spark.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Initialize SparkSession with a specific application name
spark = SparkSession.builder.appName("sp-functions").getOrCreate()

# Read CSV files from a specified directory, with headers and inferred schema
df = spark.read.csv(r"C:\Users\alefr\PycharmProjects\series-spark\data\*.csv", header=True, inferSchema=True)

# Show the first few rows of the dataframe
df.show()

### DataFrame Operations

In [None]:
# Select specific columns using `select`
df.select("cnpj_raiz", "socios", "atualizado_em").show()

In [None]:
# Select columns using `select` with column objects
df.select(col("cnpj_raiz"), col("socios")).show()

In [None]:
# Add a constant column using `lit`
df.select(lit(1).alias("valid_row")).show()

In [None]:
# Add a calculated column using `withColumn` and a conditional expression
df.withColumn("valid_row", when(col("cnpj_raiz").isNull(), lit(0)).otherwise(lit(1))).show(5)

In [None]:
# Generate unique IDs using `monotonically_increasing_id`
df.select(monotonically_increasing_id().alias("id"), "cnpj_raiz").show(5)

### Additional Examples

In [None]:
# Create a new DataFrame with a schema
df_dados = spark.createDataFrame([
    ("a", 1, 3, 4),
    ("b", 2, 4, 5),
    ("c", 3, 5, 6)
], schema='a string, b int, c int, d int')

In [None]:
# Find the greatest value among columns
df_dados.select(
    "b",
    "c",
    "d",
    greatest("b", "c", "d").alias("greatest_value")
).show()

In [None]:
# Use SQL-like expressions in `select`
df.select("cnpj_raiz", expr("CASE WHEN LENGTH(cnpj_raiz) = 8 THEN 1 ELSE 0 END").alias("flag")).show(5)

In [None]:
# Round numerical values to a specified number of decimal places
df_dados.select(round("b", 2).alias("b_rounded")).show()

In [None]:
# Display current date and timestamp
df.select(current_date(), current_timestamp()).show()

### Transformations

In [None]:
# Define a function to transform a DataFrame
def make_upper(df):
    return df.withColumn("a", upper(col("a")))

# Apply the transformation function
df_dados.transform(make_upper).show()

### Aggregations

In [None]:
# Calculate the average of a column
df_dados.select(avg("b")).show()

### Filtering and Grouping

In [None]:
# Filter rows based on a condition
df.filter(df["cnpj_raiz"] == "2421421").show()

In [None]:
# Group by a column and count occurrences
df_cnpj_por_data = df.groupBy("atualizado_em").count()
df_cnpj_por_data.show()

In [None]:
# Write the grouped DataFrame as a Parquet table
df_cnpj_por_data.write.mode("overwrite").format("parquet").saveAsTable("cnpj_por_data")