<a href="https://colab.research.google.com/github/abhishekmanjunatha/abhishekmanjunatha.github.io/blob/main/DataFrame_Selection_and_Derivation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import urllib.request
import ssl
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import * # Importing common PySpark SQL functions
import sys

# --- Environment Setup (Crucial for PySpark to run correctly) ---
# Create directories for data and Hadoop binaries if they don't exist
data_dir = "data"
os.makedirs(data_dir, exist_ok=True)

data_dir1 = "hadoop/bin"
os.makedirs(data_dir1, exist_ok=True)

# Define URLs for necessary files (test.txt, winutils.exe, hadoop.dll)
# winutils.exe and hadoop.dll are often required for Spark to run on Windows
urls_and_paths = {
    "https://raw.githubusercontent.com/saiadityaus1/SparkCore1/master/test.txt": os.path.join(data_dir, "test.txt"),
    # These files are specifically for Windows, and not needed on Linux
    # "https://github.com/saiadityaus1/SparkCore1/raw/master/winutils.exe": os.path.join(data_dir1, "winutils.exe"),
    # "https://github.com/saiadityaus1/SparkCore1/raw/master/hadoop.dll": os.path.join(data_dir1, "hadoop.dll")
}

# Create an unverified SSL context to handle potential SSL certificate issues
ssl_context = ssl._create_unverified_context()

# Download necessary files if they don't exist
for url, path in urls_and_paths.items():
    if not os.path.exists(path): # Check if file already exists to avoid re-downloading
        try:
            with urllib.request.urlopen(url, context=ssl_context) as response, open(path, 'wb') as out_file:
                out_file.write(response.read())
            print(f"Downloaded: {os.path.basename(path)}")
        except Exception as e:
            print(f"Error downloading {os.path.basename(path)}: {e}")
    else:
        print(f"File already exists: {os.path.basename(path)}")


# Set environment variables for PySpark and Hadoop
python_path = sys.executable # Get the path to the current Python executable
os.environ['PYSPARK_PYTHON'] = python_path # Tell PySpark which Python interpreter to use
# HADOOP_HOME might still be useful for some Spark features,
# but winutils.exe and hadoop.dll are not relevant on Linux.
os.environ['HADOOP_HOME'] = "hadoop" # Set Hadoop home directory

# Set JAVA_HOME to the actual path of your JDK installation on Linux.
# Replace '/path/to/your/jdk' with the correct path found in step 1.
# Example: os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'
# You might need to adjust this based on your specific Linux distribution and Java installation.
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64' # <-- **CHANGE THIS PATH**

# Optional: Configuration for external Spark packages (commented out in your files)
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.12:3.5.4 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4 pyspark-shell'

# --- Spark Session and Context Initialization ---
# Configure Spark application
conf = SparkConf().setAppName("pyspark_learning").setMaster("local[*]") \
    .set("spark.driver.host","localhost") \
    .set("spark.default.parallelism", "1")

# Initialize SparkContext (low-level API, often managed by SparkSession)
sc = SparkContext(conf=conf)

# Initialize SparkSession (entry point for DataFrame and SQL functionalities)
spark = SparkSession.builder.getOrCreate()

print("Spark Session and Context initialized successfully!")

# Assuming 'spark' and 'df' from Phase 1 setup are already available.
# Re-creating df for context in this example, but in your actual flow,
# you'd continue using the 'df' from the previous step.

data = [
    ("00000", "06-26-2011", 200, "Exercise", "GymnasticsPro", "cash"),
    ("00001", "05-26-2011", 300, "Exercise", "Weightlifting", "credit"),
    ("00002", "06-01-2011", 100, "Exercise", "GymnasticsPro", "cash"),
    ("00003", "06-05-2011", 100, "Gymnastics", "Rings", "credit"),
    ("00004", "12-17-2011", 300, "Team Sports", "Field", "paytm"),
    ("00005", "02-14-2011", 200, "Gymnastics", None, "cash")
]

columns = ["id", "tdate", "amount", "category", "product", "spendby"]
df = spark.createDataFrame(data, columns)

print("------ORIGINAL DATAFRAME-------")
df.show()
df.printSchema()

print("============Dataframe with derived columns (procdf with selectExpr------------")
procdf = df.selectExpr(
    "id",
    "split(tdate, '-')[2] as year",
    "amount+100 as amount_increased",
    "upper(category) as upper_cateroized",
    "concat(product, '~zeyo') as product_suffix",
    "spendby",
    "case when spendby='cash' then 0 else 1 end as status"
)

procdf.show()

Downloaded: test.txt
Spark Session and Context initialized successfully!
------ORIGINAL DATAFRAME-------
+-----+----------+------+-----------+-------------+-------+
|   id|     tdate|amount|   category|      product|spendby|
+-----+----------+------+-----------+-------------+-------+
|00000|06-26-2011|   200|   Exercise|GymnasticsPro|   cash|
|00001|05-26-2011|   300|   Exercise|Weightlifting| credit|
|00002|06-01-2011|   100|   Exercise|GymnasticsPro|   cash|
|00003|06-05-2011|   100| Gymnastics|        Rings| credit|
|00004|12-17-2011|   300|Team Sports|        Field|  paytm|
|00005|02-14-2011|   200| Gymnastics|         NULL|   cash|
+-----+----------+------+-----------+-------------+-------+

root
 |-- id: string (nullable = true)
 |-- tdate: string (nullable = true)
 |-- amount: long (nullable = true)
 |-- category: string (nullable = true)
 |-- product: string (nullable = true)
 |-- spendby: string (nullable = true)

+-----+----+----------------+----------------+-----------------