# MODULE 4 - BIG DATA FUNDAMENTALS

## 4.1.1.1 Data Volume

In [15]:
# ── Cell 1: configure Spark env and init findspark ──
import os

# 1) Where you unpacked Spark
os.environ['SPARK_HOME'] = '/Users/jfelixsb/spark'

# 2) Your JDK path
os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/jdk-21.jdk/Contents/Home'

# 3) Put Spark’s bin first in PATH
os.environ['PATH'] = os.path.join(os.environ['SPARK_HOME'], 'bin') + ':' + os.environ.get('PATH', '')

# 4) (Optional) helps PySpark find everything
#    Make sure you’ve `pip install findspark` in your venv
import findspark

findspark.init()

In [11]:
from pyspark import SparkConf, SparkContext

conf = SparkConf() \
    .setAppName("FacebookInteractionsCount") \
    .setMaster("local[*]")

# This will either return your already-running sc, or make it fresh
sc = SparkContext.getOrCreate(conf=conf)

# sanity check
print("Spark v.", sc._gateway.jvm.org.apache.spark.SPARK_VERSION)

data = sc.textFile('example_files/facebook_interactions.txt')

elements = data.collect()
print(elements)  # Not nice, prints an aray

for elem in elements:
    print(elem)

num_interactions = data.count()

print(f'Number of interactions: {num_interactions}')

Spark v. <py4j.java_gateway.JavaPackage object at 0x115cafb50>
['userId,postId,interactionType,timestamp', '1,1001,like,2025-07-02T10:00:00Z', '2,1001,comment,2025-07-02T10:01:23Z', '3,1002,share,2025-07-02T10:05:12Z', '4,1003,like,2025-07-02T10:08:45Z', '5,1002,like,2025-07-02T10:09:01Z', '6,1003,comment,2025-07-02T10:12:34Z', '7,1004,share,2025-07-02T10:15:55Z', '8,1001,like,2025-07-02T10:18:20Z', '9,1005,comment,2025-07-02T10:20:11Z', '10,1002,like,2025-07-02T10:22:47Z', '11,1004,like,2025-07-02T10:25:30Z', '12,1005,share,2025-07-02T10:28:05Z']
userId,postId,interactionType,timestamp
1,1001,like,2025-07-02T10:00:00Z
2,1001,comment,2025-07-02T10:01:23Z
3,1002,share,2025-07-02T10:05:12Z
4,1003,like,2025-07-02T10:08:45Z
5,1002,like,2025-07-02T10:09:01Z
6,1003,comment,2025-07-02T10:12:34Z
7,1004,share,2025-07-02T10:15:55Z
8,1001,like,2025-07-02T10:18:20Z
9,1005,comment,2025-07-02T10:20:11Z
10,1002,like,2025-07-02T10:22:47Z
11,1004,like,2025-07-02T10:25:30Z
12,1005,share,2025-07-02T10:28

## 4.1.1.2 Data Variety

In [17]:
import os

print("Working directory:", os.getcwd())
!ls -la


Working directory: /Users/jfelixsb/PycharmProjects/Masters_Big_Data_Data_Analytics
total 1016
drwxr-xr-x@ 10 jfelixsb  staff     320 Jul  2 16:13 [34m.[m[m
drwxr-xr-x  15 jfelixsb  staff     480 Jun 13 16:52 [34m..[m[m
drwxr-xr-x@ 14 jfelixsb  staff     448 Jul  2 15:26 [34m.git[m[m
drwxr-xr-x@  9 jfelixsb  staff     288 Jul  2 16:13 [34m.idea[m[m
drwxr-xr-x@ 10 jfelixsb  staff     320 Jul  2 14:46 [34m.venv[m[m
drwxr-xr-x@  3 jfelixsb  staff      96 Jul  2 15:32 [34martifacts[m[m
drwxr-xr-x@  7 jfelixsb  staff     224 Jul  2 15:29 [34mexample_files[m[m
-rw-r--r--@  1 jfelixsb  staff   43581 Jun 14 11:53 module2.ipynb
-rw-r--r--@  1 jfelixsb  staff  447307 Jun 26 12:26 module3.ipynb
-rw-r--r--@  1 jfelixsb  staff   20523 Jul  2 16:13 module4.ipynb


In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col

# Crear una sesión Spark
spark = (
    SparkSession.builder
    .appName("TwitterDataAnalysis")
    .master("local[*]")
    .getOrCreate()
)

# Leer los datos de Twitter en formato JSON
tweets_df = (spark.read
             .option("multiline", True)
             .json("example_files/tweets.json"))
print(tweets_df)
tweets_df.printSchema()

# 4) Explode the array into separate rows
tweets = tweets_df.select(explode(col("data")).alias("tweet"))

# 5) Pull out the text field
tweets_text = tweets.select(col("tweet.text").alias("text"))

# Mostrar los primeros 5 tweets
tweets_text.show()

# ((tweets_df
#  .select(explode(col("data")).alias("tweet"))
#  .select(col("tweet.text").alias("text")))
#  .show())

DataFrame[data: array<struct<id:bigint,likes:bigint,text:string,timestamp:string,user:string>>]
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- likes: long (nullable = true)
 |    |    |-- text: string (nullable = true)
 |    |    |-- timestamp: string (nullable = true)
 |    |    |-- user: string (nullable = true)

+--------------------+
|                text|
+--------------------+
|Just had the best...|
|RT @charlie: Spar...|
|Preparing my talk...|
|Sunny day in Mont...|
|Does anyone know ...|
|Deploying my firs...|
|Lunch break! 🍔 #...|
|Debugging streami...|
|Schema evolution ...|
|Wrapping up today...|
+--------------------+

+--------------------+
|                text|
+--------------------+
|Just had the best...|
|RT @charlie: Spar...|
|Preparing my talk...|
|Sunny day in Mont...|
|Does anyone know ...|
|Deploying my firs...|
|Lunch break! 🍔 #...|
|Debugging streami...|
|Schema evo

## 4.1.1.5 Data Quality

In [29]:
import pandas as pd

In [55]:
# Cargar los datos
df = pd.read_csv('example_files/data_quality_sales.csv')

print('Initial df\n', df)

# Corregir los errores de entrada de datos
df['email'] = df['email'].str.lower()
df['email'] = df['email'].fillna('No Data')
print('\nLowercasing emails\n', df)

df['sales'] = pd.to_numeric(df['sales'], errors='coerce')
print('\nTurning non-numeric cells into NaN\n', df)

# Imputar los valores faltantes con la media
# Important to make sure to only compute the mean for the numeric cols
df['sales'] = df['sales'].fillna(df['sales'].mean())
df['temperature'] = df['temperature'].fillna(df['temperature'].mean())
df['humidity'] = df['humidity'].fillna(df['humidity'].mean())
print('\nReplacing the NaN on the numeric cols by the mean of that col\n', df)


# Normalizar las fechas a un formato común
df['date'] = pd.to_datetime(df['date'], errors='coerce')
# df['date'] = df['date'].fillna('No Data')
print('\nReplacing missing dates by NaT\n', df)

# Seleccionar las características relevantes
# df = df['date', 'sales', 'temperature', 'humidity']
print('\nFinal version\n', df[['date', 'email', 'sales', 'temperature']])

Initial df
            date                 email         sales  temperature  humidity
0    2025-07-01     Alice@example.COM           100         22.5      0.30
1    2025-07-02       BOB@Example.com         200.5         21.0       NaN
2    2025-07-03   Charlie@Example.COM           NaN         20.0      0.40
3  invalid_date  danielle@Example.com  not a number         22.0      0.45
4    2025-07-05                   NaN         150.0         24.0      0.50

Lowercasing emails
            date                 email         sales  temperature  humidity
0    2025-07-01     alice@example.com           100         22.5      0.30
1    2025-07-02       bob@example.com         200.5         21.0       NaN
2    2025-07-03   charlie@example.com           NaN         20.0      0.40
3  invalid_date  danielle@example.com  not a number         22.0      0.45
4    2025-07-05               No Data         150.0         24.0      0.50

Turning non-numeric cells into NaN
            date               