> run in databricks

# first spark code

In [None]:
# Create a simple DataFrame with 5 rows
spark.range(5).show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [None]:
# can use other languages as well
%sql
select "hello world"

hello world
hello world


In [None]:
dbutils.fs.help()

In [None]:
# List files
files = dbutils.fs.ls("/databricks-datasets/")
for file in files:
    print(file.path)

dbfs:/databricks-datasets/COVID/
dbfs:/databricks-datasets/README.md
dbfs:/databricks-datasets/Rdatasets/
dbfs:/databricks-datasets/SPARK_README.md
dbfs:/databricks-datasets/adult/
dbfs:/databricks-datasets/airlines/
dbfs:/databricks-datasets/amazon/
dbfs:/databricks-datasets/asa/
dbfs:/databricks-datasets/atlas_higgs/
dbfs:/databricks-datasets/bikeSharing/
dbfs:/databricks-datasets/cctvVideos/
dbfs:/databricks-datasets/credit-card-fraud/
dbfs:/databricks-datasets/cs100/
dbfs:/databricks-datasets/cs110x/
dbfs:/databricks-datasets/cs190/
dbfs:/databricks-datasets/data.gov/
dbfs:/databricks-datasets/definitive-guide/
dbfs:/databricks-datasets/delta-sharing/
dbfs:/databricks-datasets/flights/
dbfs:/databricks-datasets/flower_photos/
dbfs:/databricks-datasets/flowers/
dbfs:/databricks-datasets/genomics/
dbfs:/databricks-datasets/hail/
dbfs:/databricks-datasets/identifying-campaign-effectiveness/
dbfs:/databricks-datasets/iot/
dbfs:/databricks-datasets/iot-stream/
dbfs:/databricks-datasets/

In [None]:
display(files)

path,name,size,modificationTime
dbfs:/databricks-datasets/COVID/,COVID/,0,1765991212603
dbfs:/databricks-datasets/README.md,README.md,976,1596557781000
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,1765991212603
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1596557823000
dbfs:/databricks-datasets/adult/,adult/,0,1765991212603
dbfs:/databricks-datasets/airlines/,airlines/,0,1765991212603
dbfs:/databricks-datasets/amazon/,amazon/,0,1765991212603
dbfs:/databricks-datasets/asa/,asa/,0,1765991212603
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0,1765991212603
dbfs:/databricks-datasets/bikeSharing/,bikeSharing/,0,1765991212603


In [None]:
# single file
file = dbutils.fs.head('dbfs:/databricks-datasets/COVID/covid-19-data/us-counties-recent.csv')
display(file)

In [None]:
# Read secrets
api_key = dbutils.secrets.get(scope="my-scope", key="api-key")
print(api_key)

In [None]:
# Notebook widgets for parameters
dbutils.widgets.text("start_date", "2024-01-01")
start_date = dbutils.widgets.get("start_date")
print(start_date)

2024-01-01


# SparkSession

In [None]:
# from pyspark.sql import SparkSession

# # Create SparkSession (in Databricks, 'spark' is pre-created)
# spark = SparkSession.builder \
#     .appName("MyApp") \
#     .config("spark.sql.shuffle.partitions", "200") \
#     .getOrCreate()

# # In Databricks notebooks, simply use:
spark  # Already available

<pyspark.sql.connect.session.SparkSession at 0xff7a392f8dd0>

# RDD
Use RDD When:
- DataFrame operations don't support your use case
- Need **fine-grained control** over execution
- Working with **unstructured data (text processing)**
- **Custom partitioning** logic required
- **Legacy Spark code**

## Simple RDD Creation
! Databricks Community Edition uses "Serverless" compute, which doesn't support direct RDD operations via sparkContext. The following raise error.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RDD_Demo").getOrCreate()
sc = spark.sparkContext  # SparkContext for RDD operations

# Create RDD from Python list
numbers_rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

print(f"Type: {type(numbers_rdd)}")  # <class 'pyspark.rdd.RDD'>
print(f"Partitions: {numbers_rdd.getNumPartitions()}") # Partitions: 8

# Collect RDD to driver (brings data back to Python)
result = numbers_rdd.collect()
print(f"Data: {result}")  # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

'''
output:  
Type: <class 'pyspark.rdd.RDD'>
Partitions: 8
Data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
'''

## RDD Transformations and Actions
RDD operations are divided into Transformations (lazy) and Actions (eager).    

In [None]:
# TRANSFORMATIONS (Lazy - don't execute until action is called)

# map: Apply function to each element
squared_rdd = numbers_rdd.map(lambda x: x ** 2)

# filter: Keep elements that satisfy condition
even_rdd = numbers_rdd.filter(lambda x: x % 2 == 0)

# flatMap: Map then flatten
words_rdd = sc.parallelize(["hello world", "spark rdd"])
words_flat_rdd = words_rdd.flatMap(lambda line: line.split(" "))

# Nothing is computed yet - these are just instructions!

# ACTIONS (Eager - trigger computation)

print(squared_rdd.collect())  # [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
print(even_rdd.collect())     # [2, 4, 6, 8, 10]
print(words_flat_rdd.collect())  # ['hello', 'world', 'spark', 'rdd']

### Common Transformations
Transformations (Lazy): Return a new RDD without computing results immediately.

| Transformation            | Description                          | Example                                   |
|---------------------------|--------------------------------------|-------------------------------------------|
| `map(func)`               | Apply function to each element       | `rdd.map(lambda x: x * 2)`                 |
| `filter(func)`            | Keep elements where func returns True| `rdd.filter(lambda x: x > 5)`              |
| `flatMap(func)`           | Map then flatten results             | `rdd.flatMap(lambda x: x.split())`         |
| `distinct()`              | Remove duplicates                    | `rdd.distinct()`                           |
| `union(other)`            | Combine two RDDs                     | `rdd1.union(rdd2)`                         |
| `intersection(other)`     | Common elements                      | `rdd1.intersection(rdd2)`                  |
| `reduceByKey(func)`       | Aggregate values by key              | `rdd.reduceByKey(lambda x, y: x + y)`      |
| `groupByKey()`            | Group values by key                  | `rdd.groupByKey()`                         |
| `sortByKey()`             | Sort by keys                         | `rdd.sortByKey()`                          |

### Common Actions
Actions (Eager): Trigger computation and return results to driver or write to storage.

| Action                 | Description                           | Example                                      |
|------------------------|---------------------------------------|----------------------------------------------|
| `collect()`            | Return all elements to driver         | `rdd.collect()`                              |
| `count()`              | Number of elements                        | `rdd.count()`                                |
| `first()`              | Return the first element              | `rdd.first()`                                |
| `take(n)`              | Return the first n elements           | `rdd.take(5)`                                |
| `reduce(func)`         | Aggregate all elements                    | `rdd.reduce(lambda x, y: x + y)`             |
| `foreach(func)`        | Apply function to each element without return (side effects)         | `rdd.foreach(print)`                         |
| `saveAsTextFile(path)` | Write RDD to a text file              | `rdd.saveAsTextFile("path")`                 |
| `countByKey()`         | Count elements per key                | `rdd.countByKey()`                           |

## Key-Value RDD Operations

In [None]:
# Create key-value pairs
pairs_rdd = sc.parallelize([
    ("apple", 5),
    ("banana", 3),
    ("apple", 2),
    ("orange", 4),
    ("banana", 6)
])

# reduceByKey: Aggregate values for each key
totals_rdd = pairs_rdd.reduceByKey(lambda x, y: x + y)
print(totals_rdd.collect())  
# [('apple', 7), ('banana', 9), ('orange', 4)]

# groupByKey: Group all values for each key
grouped_rdd = pairs_rdd.groupByKey()
# Convert to list to see values
result = grouped_rdd.mapValues(list)
print(result.collect())
# [('apple', [5, 2]), ('banana', [3, 6]), ('orange', [4])]

# sortByKey: Sort by keys
sorted_rdd = totals_rdd.sortByKey()
print(sorted_rdd.collect())
# [('apple', 7), ('banana', 9), ('orange', 4)]

## Real-World RDD Example: Word Count
The "Hello World" of Spark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WordCount").getOrCreate()
sc = spark.sparkContext

# Create RDD from text
text_rdd = sc.parallelize([
    "Hello world",
    "Hello Spark",
    "Spark is awesome",
    "RDD is fundamental"
])

# Word count pipeline
word_count = text_rdd \
    .flatMap(lambda line: line.lower().split(" ")) \  # Split into words
    .map(lambda word: (word, 1)) \                    # Create (word, 1) pairs
    .reduceByKey(lambda x, y: x + y) \                # Sum counts per word
    .sortBy(lambda pair: pair[1], ascending=False)    # Sort by count

# Display results
for word, count in word_count.collect():
    print(f"{word}: {count}")

spark.stop()

"""
**Output:**
hello: 2
spark: 2
is: 2
world: 1
awesome: 1
rdd: 1
fundamental: 1
"""

**Visualization of what happens:**
```text
Input RDD:
["Hello world", "Hello Spark", "Spark is awesome", "RDD is fundamental"]
    ↓ flatMap(split)
["hello", "world", "hello", "spark", "spark", "is", "awesome", "rdd", "is", "fundamental"]
    ↓ map(word → (word, 1))
[("hello", 1), ("world", 1), ("hello", 1), ("spark", 1), ("spark", 1), ...]
    ↓ reduceByKey(sum)
[("hello", 2), ("world", 1), ("spark", 2), ("is", 2), ...]
    ↓ sortBy(count)
[("hello", 2), ("spark", 2), ("is", 2), ("world", 1), ...]
```

## RDD Partitioning
RDDs are split into partitions for parallel processing

In [None]:
# Create RDD with specific number of partitions
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8], numSlices=4)

print(f"Partitions: {rdd.getNumPartitions()}")  # 4

# See data distribution across partitions
def print_partition(iterator):
    yield list(iterator)

partitions_data = rdd.mapPartitions(print_partition).collect()
print(partitions_data)
# [[1, 2], [3, 4], [5, 6], [7, 8]]

# Repartition (increases or decreases partitions)
rdd_repartitioned = rdd.repartition(2)
print(f"New partitions: {rdd_repartitioned.getNumPartitions()}")  # 2

# Coalesce (only decreases partitions, more efficient)
rdd_coalesced = rdd.coalesce(2)

## RDD Persistence (Caching)
Cache RDDs in memory for reuse.

In [None]:
# Create RDD
numbers_rdd = sc.parallelize(range(1, 1000000))

# Perform expensive transformation
expensive_rdd = numbers_rdd.map(lambda x: x ** 2).filter(lambda x: x > 100)

# Cache in memory (first time computes, subsequent times reuses)
expensive_rdd.cache()  # or .persist()

# First action - computes and caches
count1 = expensive_rdd.count()
print(f"Count: {count1}")

# Second action - uses cached data (much faster!)
sum_result = expensive_rdd.reduce(lambda x, y: x + y)
print(f"Sum: {sum_result}")

# Unpersist when done
expensive_rdd.unpersist()

### Storage Levels:

In [None]:
from pyspark import StorageLevel

# Different caching strategies
rdd.persist(StorageLevel.MEMORY_ONLY)      # Default cache()
rdd.persist(StorageLevel.MEMORY_AND_DISK)  # Spill to disk if needed
rdd.persist(StorageLevel.DISK_ONLY)        # Only on disk
rdd.persist(StorageLevel.MEMORY_ONLY_2)    # Replicate 2x

## Complete RDD Example: Log File Analysis

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LogAnalysis").getOrCreate()
sc = spark.sparkContext

# Sample log data
logs_rdd = sc.parallelize([
    "2024-01-01 ERROR Database connection failed",
    "2024-01-01 INFO User login successful",
    "2024-01-01 ERROR File not found",
    "2024-01-02 WARN Memory usage high",
    "2024-01-02 ERROR Network timeout",
    "2024-01-02 INFO User logout",
    "2024-01-03 ERROR Database connection failed"
])

print("="*50)
print("Log File Analysis with RDD")
print("="*50)

# 1. Count total logs
total_logs = logs_rdd.count()
print(f"\nTotal logs: {total_logs}") # Total logs: 7

# 2. Filter ERROR logs
error_logs = logs_rdd.filter(lambda line: "ERROR" in line)
print(f"Error logs: {error_logs.count()}") # Error logs: 4

# 3. Count by log level
log_levels = logs_rdd \
    .map(lambda line: line.split()[2]) \
    .map(lambda level: (level, 1)) \
    .reduceByKey(lambda x, y: x + y)

print("\nLog level counts:")
for level, count in log_levels.collect():
    print(f"  {level}: {count}")

# 4. Most common error message
error_messages = error_logs \
    .map(lambda line: " ".join(line.split()[3:])) \
    .map(lambda msg: (msg, 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .sortBy(lambda pair: pair[1], ascending=False)

print("\nMost common errors:")
for error, count in error_messages.take(3):
    print(f"  {error}: {count}")

# 5. Group by date
logs_by_date = logs_rdd \
    .map(lambda line: (line.split()[0], 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .sortByKey()

print("\nLogs by date:")
for date, count in logs_by_date.collect():
    print(f"  {date}: {count}")

spark.stop()

"""
**Output:**
```
==================================================
Log File Analysis with RDD
==================================================

Total logs: 7
Error logs: 4

Log level counts:
  ERROR: 4
  INFO: 2
  WARN: 1

Most common errors:
  Database connection failed: 2
  File not found: 1
  Network timeout: 1

Logs by date:
  2024-01-01: 3
  2024-01-02: 3
  2024-01-03: 1
"""

# Dataframe
Use DataFrame When:
- Most of the time! (**99% of cases**)
- **Structured or semi-structured** data
- **SQL-like** operations
- Need **automatic optimization**
- Better **performance**
- **Easier** to write and maintain

## DataFrame Structure
A DataFrame has:

### 1. Schema (Structure Definition)
Defines column names and data types

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Schema_Demo").getOrCreate()

# Create DataFrame
df = spark.createDataFrame([
    ('Alice', 25, 85000.50),
    ('Bob', 30, 95000.75),
    ('Charlie', 35, 105000.00)
], ['name', 'age', 'salary'])

# Print schema
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- salary: double (nullable = true)



### 2. Rows (Data Records)
Each row represents one record:

In [None]:
# Get first row
first_row = df.first()
print(first_row)
# Row(name='Alice', age=25, salary=85000.5)

# Access specific value
print(first_row.name)    # Alice
print(first_row['age'])  # 25

Row(name='Alice', age=25, salary=85000.5)
Alice
25


### 3. Columns (Fields)
Each column has a name and contains one type of data:

In [None]:
# Select specific columns
df.select('name', 'age').show()


+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+



## Creating DataFrames


### Method 1: From Python Lists/Tuples

In [None]:
# Simple data
data = [
    ('Alice', 25, 'Engineering'),
    ('Bob', 30, 'Marketing'),
    ('Charlie', 35, 'Sales')
]

columns = ['name', 'age', 'department']

df = spark.createDataFrame(data, columns)
df.show()

+-------+---+-----------+
|   name|age| department|
+-------+---+-----------+
|  Alice| 25|Engineering|
|    Bob| 30|  Marketing|
|Charlie| 35|      Sales|
+-------+---+-----------+



### Method 2: From Pandas DataFrame

In [None]:
import pandas as pd

# Create pandas DataFrame
pandas_df = pd.DataFrame({
    'name': ['Alice', 'Bob', 'Charlie'],
    'age': [25, 30, 35]
})

# Convert to Spark DataFrame
spark_df = spark.createDataFrame(pandas_df)
spark_df.show()

+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+



### Method 3: From Files (Most Common)

In [None]:
# From CSV
df_csv = spark.read.csv('data/employees.csv', header=True, inferSchema=True)
# or: spark.read.option('header', 'true').csv('xxx.csv')

# From JSON
df_json = spark.read.json('data/employees.json')

# From Parquet
df_parquet = spark.read.parquet('data/employees.parquet')

# From Database
df_db = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/mydb") \
    .option("dbtable", "employees") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

### Method 4: With Explicit Schema

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Define schema
schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("age", IntegerType(), nullable=True),
    StructField("salary", DoubleType(), nullable=True)
])

# Create DataFrame with schema
data = [
    ('Alice', 25, 85000.50),
    ('Bob', 30, 95000.75)
]

df = spark.createDataFrame(data, schema)
df.printSchema()
df.show()

root
 |-- name: string (nullable = false)
 |-- age: integer (nullable = true)
 |-- salary: double (nullable = true)

+-----+---+--------+
| name|age|  salary|
+-----+---+--------+
|Alice| 25| 85000.5|
|  Bob| 30|95000.75|
+-----+---+--------+



## Basic DataFrame Operations

In [None]:
# using covid data
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

covid_schema = StructType([
    StructField('date', DateType(), True),
    StructField('county', StringType(), True),
    StructField('state', StringType(), True),
    StructField('fips', StringType(), True),
    StructField('cases', IntegerType(), True),
    StructField('deaths', IntegerType(), True)
])
covid_df = spark.read.option('header', 'true').schema(covid_schema).csv('dbfs:/databricks-datasets/COVID/covid-19-data/us-counties-recent.csv')

### 1. Viewing Data

In [None]:
df.show()           # Default: 20 rows

+----------+---------+-------+-----+-----+------+
|      date|   county|  state| fips|cases|deaths|
+----------+---------+-------+-----+-----+------+
|2021-02-11|  Autauga|Alabama|01001| 5970|    81|
|2021-02-11|  Baldwin|Alabama|01003|18960|   240|
|2021-02-11|  Barbour|Alabama|01005| 2030|    46|
|2021-02-11|     Bibb|Alabama|01007| 2377|    54|
|2021-02-11|   Blount|Alabama|01009| 5955|   116|
|2021-02-11|  Bullock|Alabama|01011| 1136|    32|
|2021-02-11|   Butler|Alabama|01013| 1886|    64|
|2021-02-11|  Calhoun|Alabama|01015|12539|   257|
|2021-02-11| Chambers|Alabama|01017| 3305|    92|
|2021-02-11| Cherokee|Alabama|01019| 1738|    37|
|2021-02-11|  Chilton|Alabama|01021| 3672|    87|
|2021-02-11|  Choctaw|Alabama|01023|  542|    23|
|2021-02-11|   Clarke|Alabama|01025| 3367|    43|
|2021-02-11|     Clay|Alabama|01027| 1386|    53|
|2021-02-11| Cleburne|Alabama|01029| 1335|    33|
|2021-02-11|   Coffee|Alabama|01031| 5125|    90|
|2021-02-11|  Colbert|Alabama|01033| 5906|   112|


In [None]:
# Show first n rows
df.show(5)          # First 5 rows

+----------+-------+-------+-----+-----+------+
|      date| county|  state| fips|cases|deaths|
+----------+-------+-------+-----+-----+------+
|2021-02-11|Autauga|Alabama|01001| 5970|    81|
|2021-02-11|Baldwin|Alabama|01003|18960|   240|
|2021-02-11|Barbour|Alabama|01005| 2030|    46|
|2021-02-11|   Bibb|Alabama|01007| 2377|    54|
|2021-02-11| Blount|Alabama|01009| 5955|   116|
+----------+-------+-------+-----+-----+------+
only showing top 5 rows


In [None]:
df.show(2, truncate=False)  # Don't truncate long strings

+----------+-------+-------+-----+-----+------+
|date      |county |state  |fips |cases|deaths|
+----------+-------+-------+-----+-----+------+
|2021-02-11|Autauga|Alabama|01001|5970 |81    |
|2021-02-11|Baldwin|Alabama|01003|18960|240   |
+----------+-------+-------+-----+-----+------+
only showing top 2 rows


In [None]:
# Display schema
df.printSchema()

# Get column names
print('columns: ', df.columns)   # ['name', 'age', 'department']

# Count rows
print('\nnumber of rows: ', df.count())   # 3

# Get first row
print('\nfirst row: ', df.first())

# Get first n rows as list
print('\ndf.head(2): \n', df.head(2))
print('\ndf.take(2): \n', df.take(2))   # Same as head()


root
 |-- date: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- fips: string (nullable = true)
 |-- cases: string (nullable = true)
 |-- deaths: string (nullable = true)

columns:  ['date', 'county', 'state', 'fips', 'cases', 'deaths']

number of rows:  94127

first row:  Row(date='2021-02-11', county='Autauga', state='Alabama', fips='01001', cases='5970', deaths='81')

df.head(2): 
 [Row(date='2021-02-11', county='Autauga', state='Alabama', fips='01001', cases='5970', deaths='81'), Row(date='2021-02-11', county='Baldwin', state='Alabama', fips='01003', cases='18960', deaths='240')]

df.take(2): 
 [Row(date='2021-02-11', county='Autauga', state='Alabama', fips='01001', cases='5970', deaths='81'), Row(date='2021-02-11', county='Baldwin', state='Alabama', fips='01003', cases='18960', deaths='240')]


In [None]:
# Summary statistics
print('describe: ')
df.describe().show()

describe: 
+-------+----------+---------+-------+------------------+-----------------+------------------+
|summary|      date|   county|  state|              fips|            cases|            deaths|
+-------+----------+---------+-------+------------------+-----------------+------------------+
|  count|     94127|    94127|  94127|             93322|            94127|             91865|
|   mean|      NULL|     NULL|   NULL| 31471.97141081417|8752.064742316232|159.76511184890873|
| stddev|      NULL|     NULL|   NULL|16365.411306322301|34483.31580445032| 762.3375598967227|
|    min|2021-02-11|Abbeville|Alabama|             01001|                0|                 0|
|    max|2021-03-11|  Ziebach|Wyoming|             78030|             9999|               999|
+-------+----------+---------+-------+------------------+-----------------+------------------+



In [None]:
print('summary: ')
df.summary().show()  # More detailed

summary: 
+-------+----------+---------+-------+------------------+-----------------+------------------+
|summary|      date|   county|  state|              fips|            cases|            deaths|
+-------+----------+---------+-------+------------------+-----------------+------------------+
|  count|     94127|    94127|  94127|             93322|            94127|             91865|
|   mean|      NULL|     NULL|   NULL| 31471.97141081417|8752.064742316232|159.76511184890873|
| stddev|      NULL|     NULL|   NULL|16365.411306322301|34483.31580445032| 762.3375598967227|
|    min|2021-02-11|Abbeville|Alabama|             01001|                0|                 0|
|    25%|      NULL|     NULL|   NULL|           19035.0|            916.0|              16.0|
|    50%|      NULL|     NULL|   NULL|           30025.0|           2185.0|              41.0|
|    75%|      NULL|     NULL|   NULL|           46119.0|           5633.0|             100.0|
|    max|2021-03-11|  Ziebach|Wyoming|  

the summary statistics are computed on a sample of the data, not the full dataset. This can happen with large DataFrames, as Spark's summary() method may use approximate statistics for performance reasons.

To get the true maximum, always use an explicit aggregation like:

### 2. Selecting Columns

In [None]:
# Select single column
df.select('county').show(5)

# Select multiple columns
df.select('county', 'deaths').show(5)

# Using col() function
from pyspark.sql.functions import col

df.select(col('county'), col('deaths')).show(5)

# Select all columns
df.select('*').show(2)

# Select with expressions
df.select(
    col('county'),
    (col('deaths') + 5).alias('death_plus_5'),
    (col('cases') * 1.1).alias('cases_with_raise')
).show(5)


+-------+
| county|
+-------+
|Autauga|
|Baldwin|
|Barbour|
|   Bibb|
| Blount|
+-------+
only showing top 5 rows
+-------+------+
| county|deaths|
+-------+------+
|Autauga|    81|
|Baldwin|   240|
|Barbour|    46|
|   Bibb|    54|
| Blount|   116|
+-------+------+
only showing top 5 rows
+-------+------+
| county|deaths|
+-------+------+
|Autauga|    81|
|Baldwin|   240|
|Barbour|    46|
|   Bibb|    54|
| Blount|   116|
+-------+------+
only showing top 5 rows
+----------+-------+-------+-----+-----+------+
|      date| county|  state| fips|cases|deaths|
+----------+-------+-------+-----+-----+------+
|2021-02-11|Autauga|Alabama|01001| 5970|    81|
|2021-02-11|Baldwin|Alabama|01003|18960|   240|
+----------+-------+-------+-----+-----+------+
only showing top 2 rows
+-------+------------+------------------+
| county|death_plus_5|  cases_with_raise|
+-------+------------+------------------+
|Autauga|          86| 6567.000000000001|
|Baldwin|         245|           20856.0|
|Barbour| 

### 3. Filtering Rows

In [None]:
df.agg(max(col('cases').cast('int'))).show()
df.agg(max(col('deaths').cast('int'))).show()

+-----------------------+
|max(CAST(cases AS INT))|
+-----------------------+
|                1208672|
+-----------------------+

+------------------------+
|max(CAST(deaths AS INT))|
+------------------------+
|                   30068|
+------------------------+



In [None]:
# Filter with condition
df.filter(col('cases') > 1208600).show()
df.where(col('cases') > 1208600).show()  # Same as filter()

# Multiple conditions (AND)
df.filter(
    (col('cases') > 1200000) & (col('deaths') > 30000)
).show()

# Multiple conditions (OR)
df.filter(
    (col('cases') < 100) | (col('deaths') > 30050)
).show()

# Filter with SQL string
df.filter("cases > 1208600 AND deaths > 20000").show()

# NOT condition
df.filter(~(col('cases') > 100)).show()

+----------+-----------+----------+-----+-------+------+
|      date|     county|     state| fips|  cases|deaths|
+----------+-----------+----------+-----+-------+------+
|2021-03-11|Los Angeles|California|06037|1208672| 22304|
+----------+-----------+----------+-----+-------+------+

+----------+-----------+----------+-----+-------+------+
|      date|     county|     state| fips|  cases|deaths|
+----------+-----------+----------+-----+-------+------+
|2021-03-11|Los Angeles|California|06037|1208672| 22304|
+----------+-----------+----------+-----+-------+------+

+----+------+-----+----+-----+------+
|date|county|state|fips|cases|deaths|
+----+------+-----+----+-----+------+
+----+------+-----+----+-----+------+

+----------+--------------------+----------+-----+-----+------+
|      date|              county|     state| fips|cases|deaths|
+----------+--------------------+----------+-----+-----+------+
|2021-02-11|      Denali Borough|    Alaska|02068|   67|     0|
|2021-02-11|      H

### 4. Adding/Modifying Columns

In [None]:
df.columns

['date', 'county', 'state', 'fips', 'cases', 'deaths']

In [None]:
from pyspark.sql.functions import col, lit, when

# Add new column with constant value
df_county_orange = df.withColumn('county', lit('Orange'))
df_county_orange.show(5)

# Add calculated column
df_deaths_adjust = df.withColumn('deaths_adjust', col('deaths') * 1.1)
df_deaths_adjust.show(5)

# Add conditional column
df_with_category = df.withColumn('case_group',
    when(col('cases') < 10000, '!')
    .when((col('cases') >= 10000) & (col('cases') < 100000), '!')
    .otherwise('!!!')
)

df_with_category.show(5)


+----------+------+-------+-----+-----+------+
|      date|county|  state| fips|cases|deaths|
+----------+------+-------+-----+-----+------+
|2021-02-11|Orange|Alabama|01001| 5970|    81|
|2021-02-11|Orange|Alabama|01003|18960|   240|
|2021-02-11|Orange|Alabama|01005| 2030|    46|
|2021-02-11|Orange|Alabama|01007| 2377|    54|
|2021-02-11|Orange|Alabama|01009| 5955|   116|
+----------+------+-------+-----+-----+------+
only showing top 5 rows
+----------+-------+-------+-----+-----+------+------------------+
|      date| county|  state| fips|cases|deaths|     deaths_adjust|
+----------+-------+-------+-----+-----+------+------------------+
|2021-02-11|Autauga|Alabama|01001| 5970|    81| 89.10000000000001|
|2021-02-11|Baldwin|Alabama|01003|18960|   240|             264.0|
|2021-02-11|Barbour|Alabama|01005| 2030|    46|              50.6|
|2021-02-11|   Bibb|Alabama|01007| 2377|    54|59.400000000000006|
|2021-02-11| Blount|Alabama|01009| 5955|   116|127.60000000000001|
+----------+-----

### 5. Renaming Columns

In [None]:
data = [
    ('Alice', 25, 'Engineering'),
    ('Bob', 30, 'Marketing'),
    ('Charlie', 35, 'Sales')
]

columns = ['name', 'age', 'department']

df = spark.createDataFrame(data, columns)
df.show()

+-------+---+-----------+
|   name|age| department|
+-------+---+-----------+
|  Alice| 25|Engineering|
|    Bob| 30|  Marketing|
|Charlie| 35|      Sales|
+-------+---+-----------+



In [None]:
# Rename single column
df_renamed = df.withColumnRenamed('name', 'employee_name')

# Rename multiple columns (chain operations)
df_renamed = df \
    .withColumnRenamed('name', 'employee_name') \
    .withColumnRenamed('age', 'employee_age')

df_renamed.show()

+-------------+------------+-----------+
|employee_name|employee_age| department|
+-------------+------------+-----------+
|        Alice|          25|Engineering|
|          Bob|          30|  Marketing|
|      Charlie|          35|      Sales|
+-------------+------------+-----------+



### 6. Dropping Columns

In [None]:
# Drop single column
df_dropped = df.drop('age')
df_dropped.show()

# Drop multiple columns
df_dropped = df.drop('age', 'department')
df_dropped.show()

+-------+-----------+
|   name| department|
+-------+-----------+
|  Alice|Engineering|
|    Bob|  Marketing|
|Charlie|      Sales|
+-------+-----------+

+-------+
|   name|
+-------+
|  Alice|
|    Bob|
|Charlie|
+-------+



## Aggregations and Grouping

In [None]:
# Create sample data
data = [
    ('Alice', 25, 'Engineering', 85000),
    ('Bob', 30, 'Engineering', 95000),
    ('Charlie', 35, 'Marketing', 75000),
    ('Diana', 28, 'Marketing', 80000),
    ('Eve', 32, 'Sales', 70000)
]

df = spark.createDataFrame(data, ['name', 'age', 'department', 'salary'])
df.show()

+-------+---+-----------+------+
|   name|age| department|salary|
+-------+---+-----------+------+
|  Alice| 25|Engineering| 85000|
|    Bob| 30|Engineering| 95000|
|Charlie| 35|  Marketing| 75000|
|  Diana| 28|  Marketing| 80000|
|    Eve| 32|      Sales| 70000|
+-------+---+-----------+------+



### 1. Basic Aggregations

In [None]:
from pyspark.sql.functions import count, sum, avg, max, min, stddev

# Single aggregation
df.agg(avg('salary')).show()

# Multiple aggregations
df.agg(
    count('*').alias('total_employees'),
    avg('salary').alias('avg_salary'),
    max('salary').alias('max_salary'),
    min('salary').alias('min_salary')
).show()

+-----------+
|avg(salary)|
+-----------+
|    81000.0|
+-----------+

+---------------+----------+----------+----------+
|total_employees|avg_salary|max_salary|min_salary|
+---------------+----------+----------+----------+
|              5|   81000.0|     95000|     70000|
+---------------+----------+----------+----------+



### 2. GroupBy Operations

In [None]:
# Group by single column
df.groupBy('department').count().show()

# Group by with aggregations
department_stats = df.groupBy('department').agg(
    count('*').alias('employee_count'),
    avg('salary').alias('avg_salary'),
    max('salary').alias('max_salary'),
    min('age').alias('min_age')
)

department_stats.show()

+-----------+-----+
| department|count|
+-----------+-----+
|Engineering|    2|
|  Marketing|    2|
|      Sales|    1|
+-----------+-----+

+-----------+--------------+----------+----------+-------+
| department|employee_count|avg_salary|max_salary|min_age|
+-----------+--------------+----------+----------+-------+
|Engineering|             2|   90000.0|     95000|     25|
|  Marketing|             2|   77500.0|     80000|     28|
|      Sales|             1|   70000.0|     70000|     32|
+-----------+--------------+----------+----------+-------+



## Sorting Data

In [None]:
# Sort by single column (ascending)
df.orderBy('age').show()
df.sort('age').show()  # Same as orderBy()

# Sort descending
df.orderBy(
    col('age').desc()
).show()

# Sort by multiple columns
df.orderBy(
    'department', 
    col('salary').desc()
).show()

# Sort with asc() and desc()
from pyspark.sql.functions import asc, desc

df.orderBy(
    asc('department'),
    desc('salary')
).show()

+-------+---+-----------+------+
|   name|age| department|salary|
+-------+---+-----------+------+
|  Alice| 25|Engineering| 85000|
|  Diana| 28|  Marketing| 80000|
|    Bob| 30|Engineering| 95000|
|    Eve| 32|      Sales| 70000|
|Charlie| 35|  Marketing| 75000|
+-------+---+-----------+------+

+-------+---+-----------+------+
|   name|age| department|salary|
+-------+---+-----------+------+
|  Alice| 25|Engineering| 85000|
|  Diana| 28|  Marketing| 80000|
|    Bob| 30|Engineering| 95000|
|    Eve| 32|      Sales| 70000|
|Charlie| 35|  Marketing| 75000|
+-------+---+-----------+------+

+-------+---+-----------+------+
|   name|age| department|salary|
+-------+---+-----------+------+
|Charlie| 35|  Marketing| 75000|
|    Eve| 32|      Sales| 70000|
|    Bob| 30|Engineering| 95000|
|  Diana| 28|  Marketing| 80000|
|  Alice| 25|Engineering| 85000|
+-------+---+-----------+------+

+-------+---+-----------+------+
|   name|age| department|salary|
+-------+---+-----------+------+
|    Bo

## Handling Missing Data (NULL Values)

In [None]:
# Create DataFrame with nulls
data = [
    ('Alice', 25, 85000),
    ('Bob', None, 95000),
    ('Charlie', 35, None),
    (None, 28, 80000)
]

df = spark.createDataFrame(data, ['name', 'age', 'salary'])
print('original df:  ')
df.show()

print('Drop rows with ANY null values:  ')
df.dropna().show()

print('Drop rows with ALL null values: ')
df.dropna(how='all').show()

print('Drop rows with null in specific columns: ')
df.dropna(subset=['name', 'age']).show()

print('Fill null values with 0:  ')
df.fillna(0).show()  # Fill all numeric nulls with 0

print('Fill with different values per column:  ')
df.fillna({'age': 30, 'salary': 75000}).show()

print('Fill with aggregate (average age):  ')
df.fillna({
    'name': 'Unknown',
    'age': df.agg(avg('age')).collect()[0][0],
    'salary': 0
}).show()

original df:  
+-------+----+------+
|   name| age|salary|
+-------+----+------+
|  Alice|  25| 85000|
|    Bob|NULL| 95000|
|Charlie|  35|  NULL|
|   NULL|  28| 80000|
+-------+----+------+

Drop rows with ANY null values:  
+-----+---+------+
| name|age|salary|
+-----+---+------+
|Alice| 25| 85000|
+-----+---+------+

Drop rows with ALL null values: 
+-------+----+------+
|   name| age|salary|
+-------+----+------+
|  Alice|  25| 85000|
|    Bob|NULL| 95000|
|Charlie|  35|  NULL|
|   NULL|  28| 80000|
+-------+----+------+

Drop rows with null in specific columns: 
+-------+---+------+
|   name|age|salary|
+-------+---+------+
|  Alice| 25| 85000|
|Charlie| 35|  NULL|
+-------+---+------+

Fill null values with 0:  
+-------+---+------+
|   name|age|salary|
+-------+---+------+
|  Alice| 25| 85000|
|    Bob|  0| 95000|
|Charlie| 35|     0|
|   NULL| 28| 80000|
+-------+---+------+

Fill with different values per column:  
+-------+---+------+
|   name|age|salary|
+-------+---+------+

## Joins

In [None]:
# Create two DataFrames
employees = spark.createDataFrame([
    (1, 'Alice', 'Engineering'),
    (2, 'Bob', 'Marketing'),
    (3, 'Charlie', 'Sales')
], ['emp_id', 'name', 'department'])

salaries = spark.createDataFrame([
    (1, 85000),
    (2, 75000),
    (3, 70000),
    (4, 90000)  # No matching employee
], ['emp_id', 'salary'])

employees.show()
salaries.show()

+------+-------+-----------+
|emp_id|   name| department|
+------+-------+-----------+
|     1|  Alice|Engineering|
|     2|    Bob|  Marketing|
|     3|Charlie|      Sales|
+------+-------+-----------+

+------+------+
|emp_id|salary|
+------+------+
|     1| 85000|
|     2| 75000|
|     3| 70000|
|     4| 90000|
+------+------+



In [None]:
# Inner join (default)
joined = employees.join(salaries, 'emp_id', 'inner')
joined.show()

# Left outer join
left_join = employees.join(salaries, 'emp_id', 'left')
left_join.show()

# Right outer join
right_join = employees.join(salaries, 'emp_id', 'right')
right_join.show()

# Full outer join
full_join = employees.join(salaries, 'emp_id', 'outer')
full_join.show()

# Join on different column names
# employees.join(salaries, employees.emp_id == salaries.id, 'inner')

+------+-------+-----------+------+
|emp_id|   name| department|salary|
+------+-------+-----------+------+
|     1|  Alice|Engineering| 85000|
|     2|    Bob|  Marketing| 75000|
|     3|Charlie|      Sales| 70000|
+------+-------+-----------+------+

+------+-------+-----------+------+
|emp_id|   name| department|salary|
+------+-------+-----------+------+
|     1|  Alice|Engineering| 85000|
|     2|    Bob|  Marketing| 75000|
|     3|Charlie|      Sales| 70000|
+------+-------+-----------+------+

+------+-------+-----------+------+
|emp_id|   name| department|salary|
+------+-------+-----------+------+
|     1|  Alice|Engineering| 85000|
|     2|    Bob|  Marketing| 75000|
|     3|Charlie|      Sales| 70000|
|     4|   NULL|       NULL| 90000|
+------+-------+-----------+------+

+------+-------+-----------+------+
|emp_id|   name| department|salary|
+------+-------+-----------+------+
|     1|  Alice|Engineering| 85000|
|     2|    Bob|  Marketing| 75000|
|     3|Charlie|      Sal

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper, length

spark = SparkSession.builder.appName("UDF_Demo").getOrCreate()

# Sample data
data = [
    ('alice@email.com',),
    ('bob@company.org',),
    ('charlie@test.co.uk',)
]

df = spark.createDataFrame(data, ['email'])

# ✅ Built-in functions work great for simple operations
df.select(
    col('email'),
    upper(col('email')).alias('email_upper'),
    length(col('email')).alias('email_length')
).show()

# ❌ But what if you need custom logic?
# Extract domain from email: "alice@email.com" → "email.com"
# There's no built-in function for this!

+------------------+------------------+------------+
|             email|       email_upper|email_length|
+------------------+------------------+------------+
|   alice@email.com|   ALICE@EMAIL.COM|          15|
|   bob@company.org|   BOB@COMPANY.ORG|          15|
|charlie@test.co.uk|CHARLIE@TEST.CO.UK|          18|
+------------------+------------------+------------+



### With Window Functions

## User-Defined Functions (UDFs)

In [None]:
# Define custom Python function
def extract_domain(email):
    if email and '@' in email:
        return email.split('@')[1]
    return None

# Convert to UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

extract_domain_udf = udf(extract_domain, StringType())

# Now use it like a built-in function!
df.withColumn('domain', extract_domain_udf(col('email'))).show()

+------------------+-----------+
|             email|     domain|
+------------------+-----------+
|   alice@email.com|  email.com|
|   bob@company.org|company.org|
|charlie@test.co.uk| test.co.uk|
+------------------+-----------+



### Without Window Functions (using groupBy)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

spark = SparkSession.builder.appName("WindowDemo").getOrCreate()

# Sample data
data = [
    ('Sales', 'Alice', 5000),
    ('Sales', 'Bob', 6000),
    ('Sales', 'Charlie', 5500),
    ('Marketing', 'Diana', 7000),
    ('Marketing', 'Eve', 6500),
]

df = spark.createDataFrame(data, ['department', 'name', 'salary'])

# Using groupBy - LOSES individual rows
dept_avg = df.groupBy('department').agg(avg('salary').alias('dept_avg'))
dept_avg.show()
# Problem: We lost individual employee names and salaries!

+----------+--------+
|department|dept_avg|
+----------+--------+
|     Sales|  5500.0|
| Marketing|  6750.0|
+----------+--------+



In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col

# Define window: partition by department
window_spec = Window.partitionBy('department')

# Add department average to EACH row
df_with_avg = df.withColumn('dept_avg', avg('salary').over(window_spec))
df_with_avg.show()

# ✅ We kept all individual rows AND added department average!


+----------+-------+------+--------+
|department|   name|salary|dept_avg|
+----------+-------+------+--------+
| Marketing|  Diana|  7000|  6750.0|
| Marketing|    Eve|  6500|  6750.0|
|     Sales|  Alice|  5000|  5500.0|
|     Sales|    Bob|  6000|  5500.0|
|     Sales|Charlie|  5500|  5500.0|
+----------+-------+------+--------+



## Window functions 
are special functions that perform calculations across a set of rows that are related to the current row, without collapsing the rows like groupBy() does.

## SQL Queries on DataFrames
DataFrames can be queried using SQL!

In [None]:
# Create sample data
data = [
    ('Alice', 25, 'Engineering', 85000),
    ('Bob', 30, 'Engineering', 95000),
    ('Charlie', 35, 'Marketing', 75000),
    ('Diana', 28, 'Marketing', 80000),
    ('Eve', 32, 'Sales', 70000)
]

df = spark.createDataFrame(data, ['name', 'age', 'department', 'salary'])
df.show()

+-------+---+-----------+------+
|   name|age| department|salary|
+-------+---+-----------+------+
|  Alice| 25|Engineering| 85000|
|    Bob| 30|Engineering| 95000|
|Charlie| 35|  Marketing| 75000|
|  Diana| 28|  Marketing| 80000|
|    Eve| 32|      Sales| 70000|
+-------+---+-----------+------+



In [None]:
# Register DataFrame as temporary view
df.createOrReplaceTempView("employees")

# Run SQL query
result = spark.sql("""
    SELECT department, 
           COUNT(*) as employee_count,
           AVG(salary) as avg_salary
    FROM employees
    WHERE age > 25
    GROUP BY department
    ORDER BY avg_salary DESC
""")

result.show()

+-----------+--------------+----------+
| department|employee_count|avg_salary|
+-----------+--------------+----------+
|Engineering|             1|   95000.0|
|  Marketing|             2|   77500.0|
|      Sales|             1|   70000.0|
+-----------+--------------+----------+



## Complete Real-World Example

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count, when

# Initialize Spark
spark = SparkSession.builder.appName("Sales_Analysis").getOrCreate()

# Create sample sales data
sales_data = [
    ('2024-01-01', 'Electronics', 'Laptop', 1200, 2),
    ('2024-01-01', 'Electronics', 'Mouse', 25, 10),
    ('2024-01-02', 'Clothing', 'Shirt', 40, 5),
    ('2024-01-02', 'Electronics', 'Keyboard', 75, 3),
    ('2024-01-03', 'Clothing', 'Pants', 60, 4),
    ('2024-01-03', 'Electronics', 'Monitor', 300, 2)
]

df = spark.createDataFrame(
    sales_data, 
    ['date', 'category', 'product', 'price', 'quantity']
)

print("="*50)
print("Original Sales Data")
print("="*50)
df.show()

# Add calculated columns
df_with_total = df.withColumn('total_amount', col('price') * col('quantity'))

print("\n" + "="*50)
print("With Total Amount")
print("="*50)
df_with_total.show()

# Aggregate by category
category_summary = df_with_total.groupBy('category').agg(
    count('*').alias('transaction_count'),
    sum('quantity').alias('total_units_sold'),
    sum('total_amount').alias('total_revenue'),
    avg('price').alias('avg_price')
).orderBy(col('total_revenue').desc())

print("\n" + "="*50)
print("Category Summary")
print("="*50)
category_summary.show()

# Filter expensive items
expensive_items = df_with_total.filter(col('price') > 50)

print("\n" + "="*50)
print("Expensive Items (price > $50)")
print("="*50)
expensive_items.show()

# Add price category
df_categorized = df_with_total.withColumn('price_category',
    when(col('price') < 50, 'Budget')
    .when((col('price') >= 50) & (col('price') < 200), 'Mid-range')
    .otherwise('Premium')
)

print("\n" + "="*50)
print("With Price Categories")
print("="*50)
df_categorized.show()

# Summary by price category
price_category_summary = df_categorized.groupBy('price_category').agg(
    count('*').alias('product_count'),
    sum('total_amount').alias('total_revenue')
)

print("\n" + "="*50)
print("Revenue by Price Category")
print("="*50)
price_category_summary.show()

spark.stop()

Original Sales Data
+----------+-----------+--------+-----+--------+
|      date|   category| product|price|quantity|
+----------+-----------+--------+-----+--------+
|2024-01-01|Electronics|  Laptop| 1200|       2|
|2024-01-01|Electronics|   Mouse|   25|      10|
|2024-01-02|   Clothing|   Shirt|   40|       5|
|2024-01-02|Electronics|Keyboard|   75|       3|
|2024-01-03|   Clothing|   Pants|   60|       4|
|2024-01-03|Electronics| Monitor|  300|       2|
+----------+-----------+--------+-----+--------+


With Total Amount
+----------+-----------+--------+-----+--------+------------+
|      date|   category| product|price|quantity|total_amount|
+----------+-----------+--------+-----+--------+------------+
|2024-01-01|Electronics|  Laptop| 1200|       2|        2400|
|2024-01-01|Electronics|   Mouse|   25|      10|         250|
|2024-01-02|   Clothing|   Shirt|   40|       5|         200|
|2024-01-02|Electronics|Keyboard|   75|       3|         225|
|2024-01-03|   Clothing|   Pants|   

## Complete DF Example: Log File Analysis
to compare with RDD

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, split

spark = SparkSession.builder.appName("LogAnalysis").getOrCreate()

# Create DataFrame
logs_df = spark.createDataFrame([
    ("2024-01-01 ERROR Database connection failed",),
    ("2024-01-01 INFO User login successful",),
    ("2024-01-01 ERROR File not found",),
    ("2024-01-02 WARN Memory usage high",),
    ("2024-01-02 ERROR Network timeout",),
    ("2024-01-02 INFO User logout",),
    ("2024-01-03 ERROR Database connection failed",)
], ["log"])

# Parse log structure
logs_parsed = logs_df \
    .withColumn("date", split(col("log"), " ")[0]) \
    .withColumn("level", split(col("log"), " ")[1])

# Count by level
logs_parsed.groupBy("level").count().show()

# Count by date
logs_parsed.groupBy("date").count().show()

# Much cleaner and faster!

# Converting Between RDD and DataFrame


## DataFrame to RDD

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Conversion").getOrCreate()

# Create DataFrame
df = spark.createDataFrame([
    ("Alice", 25),
    ("Bob", 30),
    ("Charlie", 35)
], ["name", "age"])

# Convert to RDD
rdd = df.rdd

print(rdd.collect())
# [Row(name='Alice', age=25), Row(name='Bob', age=30), Row(name='Charlie', age=35)]

# Access as tuples
rdd_tuples = df.rdd.map(lambda row: (row.name, row.age))
print(rdd_tuples.collect())
# [('Alice', 25), ('Bob', 30), ('Charlie', 35)]

## RDD to DataFrame

In [None]:
# Method 1: With schema inference
rdd = sc.parallelize([
    ("Alice", 25),
    ("Bob", 30)
])

df = spark.createDataFrame(rdd, ["name", "age"])
df.show()

# Method 2: With explicit schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

df = spark.createDataFrame(rdd, schema)
df.show()

# ETL

## Extract file/data

In [None]:
from pyspark.sql.session import *   # avoid importing all in industrial pratics
# basically: spark = SparkSession.builder.getOrCreate
spark = SparkSession.builder.appName("Spark DataFrames").config("test", "some value").getOrCreate()


In [None]:
covid_df_no_header = spark.read.csv('dbfs:/databricks-datasets/COVID/covid-19-data/us-counties-recent.csv')
covid_df_no_header.show(5)

+----------+-------+-------+-----+-----+------+
|       _c0|    _c1|    _c2|  _c3|  _c4|   _c5|
+----------+-------+-------+-----+-----+------+
|      date| county|  state| fips|cases|deaths|
|2021-02-11|Autauga|Alabama|01001| 5970|    81|
|2021-02-11|Baldwin|Alabama|01003|18960|   240|
|2021-02-11|Barbour|Alabama|01005| 2030|    46|
|2021-02-11|   Bibb|Alabama|01007| 2377|    54|
+----------+-------+-------+-----+-----+------+
only showing top 5 rows


In [None]:
covid_df = spark.read.option('header', 'true').csv('dbfs:/databricks-datasets/COVID/covid-19-data/us-counties-recent.csv')
display(covid_df.limit(5))

date,county,state,fips,cases,deaths
2021-02-11,Autauga,Alabama,1001,5970,81
2021-02-11,Baldwin,Alabama,1003,18960,240
2021-02-11,Barbour,Alabama,1005,2030,46
2021-02-11,Bibb,Alabama,1007,2377,54
2021-02-11,Blount,Alabama,1009,5955,116


In [None]:
covid_df.printSchema()

root
 |-- date: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- fips: string (nullable = true)
 |-- cases: string (nullable = true)
 |-- deaths: string (nullable = true)



In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

covid_schema = StructType([
    StructField('date', DateType(), True),
    StructField('county', StringType(), True),
    StructField('state', StringType(), True),
    StructField('fips', StringType(), True),
    StructField('cases', IntegerType(), True),
    StructField('deaths', IntegerType(), True)
])
covid_df = spark.read.option('header', 'true').schema(covid_schema).csv('dbfs:/databricks-datasets/COVID/covid-19-data/us-counties-recent.csv')
display(covid_df)

date,county,state,fips,cases,deaths
2021-02-11,Autauga,Alabama,1001.0,5970,81.0
2021-02-11,Baldwin,Alabama,1003.0,18960,240.0
2021-02-11,Barbour,Alabama,1005.0,2030,46.0
2021-02-11,Bibb,Alabama,1007.0,2377,54.0
2021-02-11,Blount,Alabama,1009.0,5955,116.0
2021-02-11,Bullock,Alabama,1011.0,1136,32.0
2021-02-11,Butler,Alabama,1013.0,1886,64.0
2021-02-11,Calhoun,Alabama,1015.0,12539,257.0
2021-02-11,Chambers,Alabama,1017.0,3305,92.0
2021-02-11,Cherokee,Alabama,1019.0,1738,37.0


## Transform

In [None]:
# must use display() to show result
display(covid_df.filter((covid_df.state == 'California') & (covid_df.county == 'Orange')).orderBy(covid_df['date'].desc()))

date,county,state,fips,cases,deaths
2021-03-11,Orange,California,6059,263279,4379
2021-03-10,Orange,California,6059,263111,4346
2021-03-09,Orange,California,6059,262995,4313
2021-03-08,Orange,California,6059,262849,4252
2021-03-07,Orange,California,6059,262674,4226
2021-03-06,Orange,California,6059,262550,4173
2021-03-05,Orange,California,6059,262241,4075
2021-03-04,Orange,California,6059,261976,4013
2021-03-03,Orange,California,6059,261798,3966
2021-03-02,Orange,California,6059,261608,3952


### SQL query on df
turn the df into a temporary table
so sparksql can see it, and we can use sql to query it

In [None]:
covid_df.createOrReplaceTempView("covid")

In [None]:
%sql
select * from covid
where county = 'Orange'

date,county,state,fips,cases,deaths
2021-02-11,Orange,California,6059,255254,3493
2021-02-11,Orange,Florida,12095,108208,1014
2021-02-11,Orange,Indiana,18117,1698,50
2021-02-11,Orange,New York,36071,34072,607
2021-02-11,Orange,North Carolina,37135,7279,89
2021-02-11,Orange,Texas,48361,7037,98
2021-02-11,Orange,Vermont,50017,462,2
2021-02-11,Orange,Virginia,51137,1708,19
2021-02-12,Orange,California,6059,255254,3493
2021-02-12,Orange,Florida,12095,108616,1014


In [None]:
filename = 'test.parquet'
file_dir = path + filename
covid_df.write.opetion('compression', 'snappy').mode('overwrite').parquet(file_dir)

### eg: txt -> RDD -> dataframe

assume there is a people.txt file:      
name,  age,  gender        
alex,  32,  M       
bob,  33,  M        
Amily,  30,  F    

it structure is not solid

In [None]:
sc = spark.sparkContext
lines = sc.txtField(people.txt)  
# return rdd
# RDD[str]
# RDD[List[String]]

lines.map(lambda x: x.split(',')) #['alex', '32', 'M']

from pyspark.sql import Row
# RDD[Row]

# handle space
strip()
# turn to int
int()

people_df = spark.createDataFrame(people)

### data validation

In [None]:
df.dtype()
df.show(100) 
# the first 100 records, but actually spark will scan a lot more than 100, so carefully use this

df.head()
df.first()
df.take() # a list of row, safer than collect

df.schema # a sctructure type, program friendly
df.printSchema() # more human-readable

df.describe()

# carefully use collect, it will return all the data to driver, which is dangerous
df.count()
df.mean()
df.mstdev()
df.min()
df.max()

df.columns()
df.distinct().count()

df.explain()

df.dropDuplicates()
df.dropDuplicates("user_id", "date")
df.dropna()
df.fillna()
df.na.drop()
df.na.fill()
df.na.replace()
df.na.drop()

df.select("columnName").show()
df.select("columnName", "columnName2", explode("columnName3")).show()
df.select("columnName", "columnName2", explode("columnName3").alias("newColumnName")).show()

df.select(col("columnName", (col("age")+1).alias("age_plus_one")).show())

col('age')>24.alias("age_gt_24").show()

df.select('firstname', when(col("age")>20), 1).otherwise(0).alias("is_age_gt_20")

df.select("firstname", col('firstname').isin("Jane", "Alex").alias("is_in_list"))
df.select("firstname", col('firstname').like("Jane%").alias("is_like")) # scan whole tale, very slow
df.select("firstname", col('firstname').startswith("Jane").alias("is_startswith"))
df.select("firstname", col('firstname').endswith("Jane").alias("is_startswith"))

substr(1, 3) # start from 1, not zero. smith -> smi
col('age').between(22, 24)

df = df.WithColumn("age_plus_one", col("age")+1)
df = df.withColumnRenamed("age", "age_new")


df.drop("age")
df.drop("age", "firstname")

df.orderBy(col("age").desc())
df.groupby()