# 

# START ZOOKEPER SERVICE
### RUN THIS COMMAND IN A TERMINAL
```
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties

```
# START KAFKA BROKERS
### RUN THIS COMMAND ON A DIFFERENT TERMINAL FOR EACH LINE
```
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server1.properties
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server2.properties
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server3.properties
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server4.properties
```
# LIST AVALABLE BROKERS
```
/usr/local/kafka/bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids
```
# DELETE TOPICS
```
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic animals-topic-streaming
```

# LISTS TOPICS
```
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
```

# DEPENDENCIES

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

# Directory where JARs are located
jars_directory = "/usr/local/spark/jars/"

# List of JAR filenames
jar_files = [
    "commons-pool2-2.11.1.jar",
    "kafka-clients-3.3.2.jar",
    "spark-sql-kafka-0-10_2.12-3.4.1.jar",
    "spark-token-provider-kafka-0-10_2.12-3.4.1.jar"
]

dependencies = ",".join([os.path.join(jars_directory, jar) for jar in jar_files])

# Configure Kafka connection
kafka_bootstrap_servers = "localhost:9092"
topic = "animals-topic-batch"

# Create Spark session and add JARs
spark_session = SparkSession.builder \
    .appName("WriteKafkaAnimals") \
    .config("spark.jars", dependencies) \
    .getOrCreate()

24/01/10 02:43:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# BATCH WRITING

In [2]:
def save_batch_data(spark_session, kafka_bootstrap_servers, topic, iterations=1, empty=0):

    # Create a sample DataFrame with animal data
    data = [("zebra", "mammal"), ("koala", "marsupial"), ("cheetah", "feline"),("dolphin", "mammal"),
            ("parrot", "bird"), ("rhino", "mammal"), ("panda", "mammal"), ("kangaroo", "marsupial"), 
            ("panther", "feline"), ("chimpanzee", "primate"), ("hippo", "mammal"), ("eagle", "bird"), 
            ("orangutan", "primate"), ("bear", "mammal"), ("owl", "bird"), ("polar bear", "mammal"), 
            ("snake", "reptile"), ("hawk", "bird"), ("fox", "mammal"), ("turtle", "reptile"), 
            ("swan", "bird"), ("jaguar", "feline"), ("seagull", "bird"), ("gazelle", "mammal")]

    values =  data if empty == 0 else [('', '')]
    
    columns = ["name", "type"]
    
    df_animals = spark_session.createDataFrame(values, columns)

    for iteration in range(iterations):
        # Write the DataFrame to Kafka topic
        df_animals.selectExpr("name as key", "type as value") \
            .write \
            .format("kafka") \
            .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
            .option("topic", topic) \
            .save()
    
    # Print a message indicating that the data has been written to the Kafka topic
    print(f"{iterations} Iterations, Data written to Kafka topic ({topic}).")

# Configure Kafka connection
kafka_bootstrap_servers = "localhost:9092"
topic = "animals-topic-batch"
save_batch_data(spark_session, kafka_bootstrap_servers, topic)

topic = "animals-topic-streaming"
save_batch_data(spark_session=spark_session, kafka_bootstrap_servers=kafka_bootstrap_servers, topic=topic, empty=1)

                                                                                

1 Iterations, Data written to Kafka topic (animals-topic-batch).
1 Iterations, Data written to Kafka topic (animals-topic-streaming).


24/01/10 02:43:21 WARN NetworkClient: [Producer clientId=producer-1] Error while fetching metadata with correlation id 5 : {animals-topic-streaming=LEADER_NOT_AVAILABLE}


# BATCH READING

In [3]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the hexadecimal decoding function
@udf(returnType=StringType())
def decode_hex(value):
    try:
        if isinstance(value, str):
            return bytes.fromhex(value).decode('utf-8')
        elif isinstance(value, bytearray):
            return bytes(value).decode('utf-8')
        else:
            return str(value)
    except (ValueError, UnicodeDecodeError):
        return str(value)
            
def read_batch_data(spark_session, kafka_bootstrap_servers, topic):

    # Try to read data from Kafka
    try:
        # Read data from Kafka
        df_kafka = spark_session \
            .read \
            .format("kafka") \
            .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
            .option("subscribe", topic) \
            .load()
    
        # Decode hexadecimal values
        df_decoded = df_kafka \
            .withColumn('key', decode_hex('key')) \
            .withColumn('value', decode_hex('value'))
    
        # Show the DataFrame with decoded data
        df_decoded.show(truncate=False)
    
    except Exception as e:
        if "UnknownTopicOrPartitionException" in str(e):
            print(f"The topic '{topic}' does not exist in the Kafka cluster.")
        else:
            print(f"Unexpected error: {e}")
    
    finally:
        # Stop the Spark session
        None

topic = "animals-topic-batch"
read_batch_data(spark_session, kafka_bootstrap_servers, topic)

24/01/10 02:43:21 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
24/01/10 02:43:22 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


+----------+---------+-------------------+---------+------+-----------------------+-------------+
|key       |value    |topic              |partition|offset|timestamp              |timestampType|
+----------+---------+-------------------+---------+------+-----------------------+-------------+
|polar bear|mammal   |animals-topic-batch|0        |0     |2024-01-10 02:39:30.433|0            |
|dolphin   |mammal   |animals-topic-batch|0        |1     |2024-01-10 02:39:30.433|0            |
|seagull   |bird     |animals-topic-batch|0        |2     |2024-01-10 02:39:30.432|0            |
|koala     |marsupial|animals-topic-batch|0        |3     |2024-01-10 02:39:30.432|0            |
|fox       |mammal   |animals-topic-batch|0        |4     |2024-01-10 02:39:30.432|0            |
|kangaroo  |marsupial|animals-topic-batch|0        |5     |2024-01-10 02:39:30.433|0            |
|bear      |mammal   |animals-topic-batch|0        |6     |2024-01-10 02:39:30.433|0            |
|parrot    |bird    

24/01/10 02:43:23 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/01/10 02:43:23 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/01/10 02:43:23 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/01/10 02:43:23 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/01/10 02:43:23 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/01/10 02:43:23 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

# STREAMING WRITING
## IT WILL BE LISTENING TO THE "animals-topic-batch" TOPIC
## WHEN NEW DATA ARRIVES, IT READS AND STORES THEM IN THE "animals-topic-streaming" TOPIC
## MAKE THE TRANSFORMATIONS

In [4]:
from pyspark.sql.functions import expr

def read_streaming_data(spark_session, kafka_bootstrap_servers, input_topic, output_topic, checkpoint_location, files_directory):

    #spark_session.sparkContext.setLogLevel("DEBUG")    
    # Read from Kafka in streaming mode
    kafkaStream = spark_session \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", input_topic) \
        .option("startingOffsets", "earliest") \
        .load()
    
    # Perform some transformation on the data (here we are simply renaming the columns and uppercase the values)
    transformedStream = kafkaStream.selectExpr( "CAST(key AS STRING) as key","UPPER(CAST(value AS STRING)) as value")

    # save into parquet file
    query_parquet = transformedStream \
      .writeStream \
      .outputMode("append") \
      .format("parquet") \
      .option("path", files_directory) \
      .option("checkpointLocation", checkpoint_location) \
      .trigger(processingTime="1 minute") \
      .start()

    query = transformedStream \
        .writeStream \
        .option("failOnDataLoss", "false") \
        .outputMode("append") \
        .format("kafka") \
        .option("truncate", "false") \
        .option("checkpointLocation", checkpoint_location) \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("topic", output_topic) \
        .start()

    print('kafkaStream', type(kafkaStream))
    print('transformedStream', type(transformedStream))
    print('query', type(query))

# Configure Kafka connection
kafka_bootstrap_servers = "localhost:9092"
input_topic = "animals-topic-batch"
output_topic = "animals-topic-streaming"
# Checkpoint directory within the Kafka directory
checkpoint_location = "/usr/local/kafka/data/checkpoint"
files_directory = "/usr/local/kafka/data/files"

read_streaming_data(spark_session, kafka_bootstrap_servers, input_topic, output_topic, checkpoint_location, files_directory)

kafkaStream <class 'pyspark.sql.dataframe.DataFrame'>
transformedStream <class 'pyspark.sql.dataframe.DataFrame'>
query <class 'pyspark.sql.streaming.query.StreamingQuery'>


24/01/10 02:43:23 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/01/10 02:43:23 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/01/10 02:43:23 WARN StreamingQueryManager: Stopping existing streaming query [id=f44f450a-1995-4949-946e-e8cd13ac893b, runId=0af9b616-b5a1-47db-808a-7618e95b267e], as a new run is being started.
24/01/10 02:43:23 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


## MASSIVE DATA INSERTION TO KAFKA FOR STREAMING READ
### COPY, PASTE, AND RUN THE FOLLOWING CODE IN ANOTHER NOTEBOOK TO OBSERVE STREAMING REDIN


```python
import os
from pyspark.sql import SparkSession
from time import sleep

# Directory where JARs are located
jars_directory = "/usr/local/spark/jars/"

# List of JAR filenames
jar_files = [
    "commons-pool2-2.11.1.jar",
    "kafka-clients-3.3.2.jar",
    "spark-sql-kafka-0-10_2.12-3.4.1.jar",
    "spark-token-provider-kafka-0-10_2.12-3.4.1.jar"
]

dependencies = ",".join([os.path.join(jars_directory, jar) for jar in jar_files])

# Configure Kafka connection
kafka_bootstrap_servers = "localhost:9092"
topic = "animals-topic-batch"


# Create Spark session and add JARs
spark_session = SparkSession.builder \
    .appName("WriteKafkaAnimals") \
    .config("spark.jars", dependencies) \
    .getOrCreate()


def save_batch_data(spark_session, kafka_bootstrap_servers, topic, iterations=1):

    # Create a sample DataFrame with animal data
    data = [("zebra", "mammal"), ("koala", "marsupial"), ("cheetah", "feline"),("dolphin", "mammal"),
            ("parrot", "bird"), ("rhino", "mammal"), ("panda", "mammal"), ("kangaroo", "marsupial"), 
            ("panther", "feline"), ("chimpanzee", "primate"), ("hippo", "mammal"), ("eagle", "bird"), 
            ("orangutan", "primate"), ("bear", "mammal"), ("owl", "bird"), ("polar bear", "mammal"), 
            ("snake", "reptile"), ("hawk", "bird"), ("fox", "mammal"), ("turtle", "reptile"), 
            ("swan", "bird"), ("jaguar", "feline"), ("seagull", "bird"), ("gazelle", "mammal")]
    
    columns = ["name", "type"]
    
    df_animals = spark_session.createDataFrame(data, columns)

    for iteration in range(iterations):
        sleep(0.2)
        # Write the DataFrame to Kafka topic
        df_animals.selectExpr("name as key", "type as value") \
            .write \
            .format("kafka") \
            .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
            .option("topic", topic) \
            .save()
        print (f'Iteration {iteration}, completed!!!')
    
    # Print a message indicating that the data has been written to the Kafka topic
    print(f"{iterations} Iterations, Data written to Kafka topic ({topic}).")

     # Finally, stop the Spark session
    spark_session.stop()

save_batch_data(spark_session, kafka_bootstrap_servers, topic, iterations=13)
```

# STREAMING READING

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

# Configure Kafka connection
kafka_bootstrap_servers = "localhost:9092"
topic = "animals-topic-streaming"

def read_streaming_data(spark_session,kafka_bootstrap_servers, topic):
   
    # Read from Kafka in streaming mode
    kafkaStream = spark_session.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", topic) \
        .option("startingOffsets", "earliest") \
        .load()
    
    # Show the read content
    query = kafkaStream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
        .writeStream \
        .outputMode("append") \
        .format("console") \
        .start() \
    
    # Wait for the stream to finish (adjust as needed)
    #query.awaitTermination()
    
    # Finally, stop the Spark session (you can stop it after the stream finishes)
    #spark_session.stop()

read_streaming_data(spark_session, kafka_bootstrap_servers, topic)

24/01/10 02:43:39 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e8bcd1f0-de5b-4eb8-8d2c-034223502286. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/01/10 02:43:39 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/01/10 02:43:39 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


-------------------------------------------
Batch: 0
-------------------------------------------
+----------+---------+
|       key|    value|
+----------+---------+
|          |         |
|    jaguar|   FELINE|
|     hippo|   MAMMAL|
| orangutan|  PRIMATE|
|chimpanzee|  PRIMATE|
|    parrot|     BIRD|
|     panda|   MAMMAL|
|     eagle|     BIRD|
|     snake|  REPTILE|
|     koala|MARSUPIAL|
|     zebra|   MAMMAL|
|     rhino|   MAMMAL|
|polar bear|   MAMMAL|
|      hawk|     BIRD|
|   dolphin|   MAMMAL|
|   cheetah|   FELINE|
|    turtle|  REPTILE|
|  kangaroo|MARSUPIAL|
|      swan|     BIRD|
|   panther|   FELINE|
+----------+---------+
only showing top 20 rows

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+---------+
|       key|    value|
+----------+---------+
|     zebra|   MAMMAL|
|      bear|   MAMMAL|
|     panda|   MAMMAL|
|    jaguar|   FELINE|
|  kangaroo|MARSUPIAL|
|       fox|   MAMMAL|
|     koala|MARSUPIAL

# STREAMING FILES USING FROM DIRECTORIES

In [6]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, FloatType
from pyspark.sql.functions import expr
import shutil
from pyspark.sql import functions as F
from datetime import datetime
from pyspark.sql.functions import lit
from pyspark.sql.functions import col

def process_batch(directory_to_save_files, df, epoch_id):
    
    # Generate a timestamp
    timestamp_str = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")

    # Add epoch_id and timestamp_str as new columns
    df = df.withColumn("batch_id", lit(epoch_id)) \
           .withColumn("timestamp", lit(timestamp_str))

    # Print information about the new batch
    print(f"Processing microbatch {epoch_id} at {timestamp_str}")

    # Group by specified columns and calculate the average ticket price
    grouped_df = df.groupBy(
        "batch_id",
        "timestamp",
        "passenger_nationality",
        "passenger_gender",
        "passenger_age"
    ).agg(
        F.avg("ticket_price").alias("avg_ticket_price"),
        F.sum("ticket_price").alias("total_ticket_price"),
        F.count("passenger_name").alias("total_passengers")
    )

    # Filter out groups where the total number of passengers is greater than 1
    # filtered_df = grouped_df.filter(grouped_df.total_passengers > 1)
    # Multiple filter conditions 
    filtered_df = grouped_df.filter((col("total_passengers") > 2) & (col("total_passengers") <= 7))

    # Sort the DataFrame by passenger_nationality and passenger_age
    sorted_df = filtered_df.orderBy("total_passengers","passenger_nationality", "passenger_gender")

    # Show the sorted DataFrame
    sorted_df.show(truncate=False)

    # Coalesce to a single partition and write the DataFrame as Parquet with timestamp
    maximum_parquet_files_per_batch  = 3
    sorted_df.coalesce(maximum_parquet_files_per_batch ).write.parquet(f"{directory_to_save_files}/microbatch_{epoch_id}_{timestamp_str}")

    # Show the original DataFrame
    df.show()

def read_file_like_streaming(spark_session, customSchema, format, checkpoint_location, directory_to_save_files):
    
    # Read the DataFrame as a continuous stream
    streaming_df = spark_session \
                    .readStream \
                    .schema(customSchema) \
                    .format(format) \
                    .option("header", "true") \
                    .load(folder_files_path)

    # Display the DataFrame in the console and count the records per microbatch
    # .trigger(processingTime='5 seconds') specifies that Spark Structured Streaming should process micro-batches of data from the specified directory every 5 seconds.
    query = streaming_df \
        .writeStream \
        .outputMode("append") \
        .trigger(processingTime='5 seconds') \
        .option("checkpointLocation", checkpoint_location) \
        .option("basePath", directory_to_save_files) \
        .foreachBatch(lambda df, epoch_id: process_batch(directory_to_save_files, df, epoch_id)) \
        .start()
    
    # Wait for the streaming to complete
    #query.awaitTermination()

# Define the schema
customSchema = StructType([
    StructField("id", IntegerType(), True),
    StructField("secure_code", StringType(), True),
    StructField("airline", StringType(), True),
    StructField("departure_city", StringType(), True),
    StructField("departure_date", StringType(), True),
    StructField("arrival_airport", StringType(), True),
    StructField("arrival_city", StringType(), True),
    StructField("arrival_time", StringType(), True),
    StructField("passenger_name", StringType(), True),
    StructField("passenger_gender", StringType(), True),
    StructField("seat_number", StringType(), True),
    StructField("currency", StringType(), True),
    StructField("departure_gate", StringType(), True),
    StructField("flight_status", StringType(), True),
    StructField("co_pilot_name", StringType(), True),
    StructField("aircraft_type", StringType(), True),
    StructField("fuel_consumption", DoubleType(), True),
    StructField("flight_id", IntegerType(), True),
    StructField("flight_number", IntegerType(), True),
    StructField("departure_airport", StringType(), True),
    StructField("departure_country", StringType(), True),
    StructField("departure_time", StringType(), True),
    StructField("arrival_country", StringType(), True),
    StructField("arrival_date", StringType(), True),
    StructField("flight_duration", DoubleType(), True),
    StructField("passenger_age", IntegerType(), True),
    StructField("passenger_nationality", StringType(), True),
    StructField("ticket_price", DoubleType(), True),
    StructField("baggage_weight", DoubleType(), True),
    StructField("arrival_gate", StringType(), True),
    StructField("pilot_name", StringType(), True),
    StructField("cabin_crew_count", IntegerType(), True),
    StructField("aircraft_registration", StringType(), True),
    StructField("flight_distance", DoubleType(), True)
])

format = 'csv'
folder_files_path = "/notebooks"
directory_to_save_files = "/usr/local/kafka/data/files/batch"
shutil.rmtree(directory_to_save_files, ignore_errors=True)

# Delete the checkpoint directory
checkpoint_location = "/usr/local/kafka/data/batch"
shutil.rmtree(checkpoint_location, ignore_errors=True)

read_file_like_streaming(spark_session=spark_session, customSchema=customSchema, format=format, checkpoint_location=checkpoint_location, directory_to_save_files=directory_to_save_files)

24/01/10 02:43:57 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/01/10 02:43:57 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Processing microbatch 0 at 2024_01_10_02_43_57


24/01/10 02:43:58 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 1, schema size: 34
CSV file: file:///notebooks/Spark-Kafka-Batch-Streaming-MlLib.ipynb
24/01/10 02:43:58 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 1, schema size: 34
CSV file: file:///notebooks/Untitled.ipynb


+--------+---------+---------------------+----------------+-------------+----------------+------------------+----------------+
|batch_id|timestamp|passenger_nationality|passenger_gender|passenger_age|avg_ticket_price|total_ticket_price|total_passengers|
+--------+---------+---------------------+----------------+-------------+----------------+------------------+----------------+
+--------+---------+---------------------+----------------+-------------+----------------+------------------+----------------+



24/01/10 02:43:59 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 1, schema size: 34
CSV file: file:///notebooks/Spark-Kafka-Batch-Streaming-MlLib.ipynb
24/01/10 02:43:59 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 1, schema size: 34
CSV file: file:///notebooks/Untitled.ipynb
24/01/10 02:44:00 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 1, schema size: 34
CSV file: file:///notebooks/Spark-Kafka-Batch-Streaming-MlLib.ipynb


+----+-----------+-------+--------------+--------------+---------------+------------+------------+--------------+----------------+-----------+--------+--------------+-------------+-------------+-------------+----------------+---------+-------------+-----------------+-----------------+--------------+---------------+------------+---------------+-------------+---------------------+------------+--------------+------------+----------+----------------+---------------------+---------------+--------+-------------------+
|  id|secure_code|airline|departure_city|departure_date|arrival_airport|arrival_city|arrival_time|passenger_name|passenger_gender|seat_number|currency|departure_gate|flight_status|co_pilot_name|aircraft_type|fuel_consumption|flight_id|flight_number|departure_airport|departure_country|departure_time|arrival_country|arrival_date|flight_duration|passenger_age|passenger_nationality|ticket_price|baggage_weight|arrival_gate|pilot_name|cabin_crew_count|aircraft_registration|flight_dis

# COPY, PASTE, AND RUN THE FOLLOWING CODE IN ANOTHER NOTEBOOK TO OBSERVE STREAMING REDING

```python
import pandas as pd
import numpy as np
from time import sleep

for index in range(1,6):

    part_1 = index
    part_2 = index + 5

    file_name = 'flight_logs'
    final_file = f'{file_name}_{index}.csv'
    
    file_1 = f'{file_name}_part_1_{index}.csv'
    base_url1 = f'https://raw.githubusercontent.com/JorgeCardona/recursos/main/datasets/{file_1}'
    df1 = pd.read_csv(base_url1)
    
    file_2 = f'{file_name}_part_2_{index}.csv'
    base_url2 = f'https://raw.githubusercontent.com/JorgeCardona/recursos/main/datasets/{file_2}'
    df2 = pd.read_csv(base_url2)

    df3 = pd.merge(df1, df2, left_on='id', right_on='flight_id', how='inner')
    df3.to_csv(f'{final_file}',index=False)

    print(f'{final_file} saved Successfully!!')
    sleep(5)
```

# LOAD PARQUET FILES DIRECTORY ON DATAFRAME

In [7]:
import pandas as pd
import pyarrow.parquet as pq
import glob

def read_parquet_directory_into_dataframe(directory_path):
    """
    Reads all Parquet files from a directory into a single pandas DataFrame.

    Parameters:
    - directory_path (str): Path to the directory containing Parquet files.

    Returns:
    - pd.DataFrame: Combined DataFrame containing data from all Parquet files.
    """
    # Use glob to get a list of all Parquet file paths in the specified directory
    parquet_files = glob.glob(f'{directory_path}/*.parquet')
    
    # Initialize an empty list to store individual DataFrames
    dfs = []
    
    # Loop through each Parquet file and read it into a DataFrame
    for parquet_file in parquet_files:
        # Read Parquet file into a DataFrame
        df = pq.read_table(parquet_file).to_pandas()
        
        # Append the DataFrame to the list
        dfs.append(df)
    
    # Concatenate all DataFrames into a single DataFrame
    all_data = pd.concat(dfs, ignore_index=True)
    
    return all_data

# Specify the directory where Parquet files are located
directory_to_save_files = "/usr/local/kafka/data/files/batch"
microbatch_parquet_directory_name = 'microbatch_3_2024_01_10_02_44_30'
directory_path = f'{directory_to_save_files}/{microbatch_parquet_directory_name}/'

read_parquet_directory_into_dataframe(directory_path)

Unnamed: 0,batch_id,timestamp,passenger_nationality,passenger_gender,passenger_age,avg_ticket_price,total_ticket_price,total_passengers
0,3,2024_01_10_02_44_30,Indonesia,Female,57,293.1,879.3,3
1,3,2024_01_10_02_44_30,China,Female,80,473.9875,1895.95,4
2,3,2024_01_10_02_44_30,China,Male,33,448.2925,1793.17,4
3,3,2024_01_10_02_44_30,China,Female,38,330.07,990.21,3
4,3,2024_01_10_02_44_30,China,Female,32,481.443333,1444.33,3
5,3,2024_01_10_02_44_30,China,Female,18,560.283333,1680.85,3
6,3,2024_01_10_02_44_30,China,Female,79,263.6,790.8,3
7,3,2024_01_10_02_44_30,China,Female,56,464.856667,1394.57,3
8,3,2024_01_10_02_44_30,China,Male,29,579.73,1739.19,3
9,3,2024_01_10_02_44_30,China,Male,69,441.94,1325.82,3


# SPARK MLLIB + SCIKIT LEARN

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType, StringType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import format_number
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

# Crear la sesión de Spark
spark = SparkSession.builder.master("local").appName("scikit-learn-with-spark").getOrCreate()

# Load the Iris dataset
iris = load_iris()
X, y = iris.data, iris.target

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Convert numpy values to Python floats
X_train = [Vectors.dense(features) for features in X_train]
y_train = [float(label) for label in y_train]  # Convert to float

# Create Spark DataFrames from training data
train_data = list(zip(X_train, y_train))
schema = StructType([StructField("features", VectorUDT(), True), StructField("label", FloatType(), True)])
train_df = spark.createDataFrame(train_data, schema=schema)

# Create and train a logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)
model = lr.fit(train_df)

# Now you can use the model to make predictions on new data (X_test)
X_test_spark = [Vectors.dense(features) for features in X_test]
test_data = list(zip(X_test_spark, [float(label) for label in y_test]))
test_schema = StructType([StructField("features", VectorUDT(), True), StructField("label", FloatType(), True)])
test_df = spark.createDataFrame(test_data, schema=test_schema)

# Make predictions on the test data
prediction_result = model.transform(test_df)
# Format prediction column to display two decimal places
prediction_result = prediction_result.withColumn("formatted_prediction", format_number("prediction", 2))

# Map numeric labels back to target names
target_names = ["setosa", "versicolor", "virginica"]
label_to_name_udf = udf(lambda label: target_names[int(label)], StringType())

# Create new columns with predicted and actual class names
prediction_result = prediction_result.withColumn("predicted_class_name", label_to_name_udf("prediction"))
prediction_result = prediction_result.withColumn("actual_class_name", label_to_name_udf("label"))

# Display the results
prediction_result.show()

regression_evaluator_mse = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="mse")
print("Mean Squared Error (Regression):", regression_evaluator_mse.evaluate(prediction_result))

regression_evaluator_rmse = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
print("Root Mean Squared Error (Regression):", regression_evaluator_rmse.evaluate(prediction_result))

regression_evaluator_mae = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="mae")
print("Mean Absolute Error (Regression):", regression_evaluator_mae.evaluate(prediction_result))

# Evaluación del rendimiento del modelo
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(prediction_result)

# Model performance evaluation
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(prediction_result)
print(f"Accuracy: {accuracy}")

# Binary Classification Evaluator for ROC
binary_evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label")
roc_auc = binary_evaluator.evaluate(prediction_result, {binary_evaluator.metricName: "areaUnderROC"})
print(f"Area Under ROC: {roc_auc}")

# F1 Score
evaluator_f1 = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="f1")
f1_score = evaluator_f1.evaluate(prediction_result)
print(f"F1 Score: {f1_score}")

# Precision
evaluator_precision = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(prediction_result)
print(f"Precision: {precision}")

# Recall
evaluator_recall = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedRecall")
recall = evaluator_recall.evaluate(prediction_result)
print(f"Recall: {recall}")

# R2 Score (Regression)
r2_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="r2")
print("R2 Score (Regression):", r2_evaluator.evaluate(prediction_result))

# Extract predictions and labels as RDD
prediction_and_label = prediction_result.select("prediction", "label").rdd.map(lambda row: (float(row["prediction"]), float(row["label"])))

# Create a MulticlassMetrics object
metrics = MulticlassMetrics(prediction_and_label)

# Output the confusion matrix
confusion_matrix = metrics.confusionMatrix().toArray()
print("Confusion Matrix:")
print(confusion_matrix)

# Display the results like a pandas DataFrame
df_pandas = prediction_result.toPandas()
df_pandas

+-----------------+-----+--------------------+--------------------+----------+--------------------+--------------------+-----------------+
|         features|label|       rawPrediction|         probability|prediction|formatted_prediction|predicted_class_name|actual_class_name|
+-----------------+-----+--------------------+--------------------+----------+--------------------+--------------------+-----------------+
|[6.1,2.8,4.7,1.2]|  1.0|[-0.7015267998759...|[0.25821627409246...|       1.0|                1.00|          versicolor|       versicolor|
|[5.7,3.8,1.7,0.3]|  0.0|[0.39468553474413...|[0.53313412218413...|       0.0|                0.00|              setosa|           setosa|
|[7.7,2.6,6.9,2.3]|  2.0|[-1.6418662900049...|[0.10701004791222...|       2.0|                2.00|           virginica|        virginica|
|[6.0,2.9,4.5,1.5]|  1.0|[-0.7400872505378...|[0.24477418451260...|       1.0|                1.00|          versicolor|       versicolor|
|[6.8,2.8,4.8,1.4]|  1.0|[-



Confusion Matrix:
[[10.  0.  0.]
 [ 0.  8.  1.]
 [ 0.  0. 11.]]


Unnamed: 0,features,label,rawPrediction,probability,prediction,formatted_prediction,predicted_class_name,actual_class_name
0,"[6.1, 2.8, 4.7, 1.2]",1.0,"[-0.7015267998759547, -0.29696087223267353, -0...","[0.2582162740924694, 0.3869762931285481, 0.354...",1.0,1.0,versicolor,versicolor
1,"[5.7, 3.8, 1.7, 0.3]",0.0,"[0.3946855347441336, -0.29696087223267353, -0....","[0.5331341221841369, 0.2669674182430638, 0.199...",0.0,0.0,setosa,setosa
2,"[7.7, 2.6, 6.9, 2.3]",2.0,"[-1.6418662900049688, -0.29696087223267353, -0...","[0.1070100479122216, 0.41068568020211077, 0.48...",2.0,2.0,virginica,virginica
3,"[6.0, 2.9, 4.5, 1.5]",1.0,"[-0.7400872505378162, -0.29696087223267353, -0...","[0.24477418451260918, 0.38125272214940775, 0.3...",1.0,1.0,versicolor,versicolor
4,"[6.8, 2.8, 4.8, 1.4]",1.0,"[-0.7907867010435061, -0.29696087223267353, -0...","[0.23752437379748587, 0.3892010598800947, 0.37...",1.0,1.0,versicolor,versicolor
5,"[5.4, 3.4, 1.5, 0.4]",0.0,"[0.4181480135099761, -0.29696087223267353, -0....","[0.5365586934364456, 0.26245170154053943, 0.20...",0.0,0.0,setosa,setosa
6,"[5.6, 2.9, 3.6, 1.3]",1.0,"[-0.4329315754514864, -0.29696087223267353, -0...","[0.3105622916277409, 0.35579517054527976, 0.33...",1.0,1.0,versicolor,versicolor
7,"[6.9, 3.1, 5.1, 2.3]",2.0,"[-1.151600798687717, -0.29696087223267353, -0....","[0.16364043077100981, 0.3846413924135498, 0.45...",2.0,2.0,virginica,virginica
8,"[6.2, 2.2, 4.5, 1.5]",1.0,"[-0.7400872505378162, -0.29696087223267353, -0...","[0.24477418451260918, 0.38125272214940775, 0.3...",1.0,1.0,versicolor,versicolor
9,"[5.8, 2.7, 3.9, 1.2]",1.0,"[-0.48363102595717633, -0.29696087223267353, -...","[0.3020901521359054, 0.36408801707524013, 0.33...",1.0,1.0,versicolor,versicolor


# CLOSE SPARK SESSION

In [9]:
spark_session.stop()