<a href="https://colab.research.google.com/github/hvr2026/Apache-Beam-Data-Engineering/blob/main/Apache%20beam%20features/Netflix_Apache_Beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
# Step 1: Install Apache Beam and pandas
!pip install apache-beam[gcp] pandas --quiet

In [49]:
# Import required libraries
import os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import pandas as pd
import json
import matplotlib.pyplot as plt
from collections import Counter


In [52]:
# Step 3: Define a custom ParDo for preprocessing
class PreprocessData(beam.DoFn):
    def process(self, element):
        try:
            record = json.loads(element)
            return [{
                'title': record.get('title', ''),
                'release_year': int(record.get('release_year', 0)) if record.get('release_year') else None,
                'duration': record.get('duration', ''),
                'type': record.get('type', '')
            }]
        except Exception as e:
            print(f"Error processing record: {e}")
            return []



In [53]:
# Step 4: Composite Transform - Extract Movies
class ExtractMovies(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "Filter Movies" >> beam.Filter(lambda record: record and record.get('type') == 'Movie')
            | "Extract Movie Details" >> beam.Map(lambda record: {
                'title': record['title'],
                'release_year': record['release_year'],
                'duration': record['duration']
            })
        )


In [54]:

# Step 5: Simulate Streaming Data Source from the Netflix dataset
def generate_stream_data(file_path):
    if not os.path.exists(file_path):
        print(f"Error: File {file_path} not found. Ensure the file is in the correct location.")
        return []

    df = pd.read_csv(file_path)
    for _, row in df.iterrows():
        yield json.dumps(row.to_dict())  # Convert each row to JSON for processing


In [55]:

# Step 6: Visualization Helper Function
def plot_release_year_distribution(data):
    if not data:
        print("No data to visualize.")
        return

    years = [record['release_year'] for record in data if record.get('release_year')]
    year_counts = Counter(years)
    plt.bar(year_counts.keys(), year_counts.values())
    plt.xlabel("Release Year")
    plt.ylabel("Count")
    plt.title("Distribution of Release Years")
    plt.show()


In [56]:

# Step 7: Run the Apache Beam pipeline
def run_pipeline(file_path):
    if not os.path.exists(file_path):
        print(f"Error: File {file_path} not found. Please provide a valid file.")
        return

    options = PipelineOptions(streaming=False, save_main_session=True)
    collected_data = []

    with beam.Pipeline(options=options) as pipeline:
        # Step 7.1: Generate streaming data from the dataset
        raw_stream = (
            pipeline
            | "Generate Streaming Data" >> beam.Create(list(generate_stream_data(file_path)))
        )

        # Step 7.2: Apply ParDo for preprocessing
        preprocessed_data = (
            raw_stream
            | "Preprocess Data" >> beam.ParDo(PreprocessData())
            | "Filter None Values" >> beam.Filter(lambda record: record is not None)  # Filter None
            | "Debug Preprocessed Data" >> beam.Map(print)  # Debug Preprocessed Data
        )

        # Step 7.3: Apply composite transform to extract movie data
        movies_data = (
            preprocessed_data
            | "Extract Movies" >> ExtractMovies()
            | "Debug Movies Data" >> beam.Map(print)  # Debug Movies Data
        )



In [57]:
# Step 8: Execute the pipeline with the Netflix dataset
file_path = "/content/netflix_titles.csv"  # Update to the correct path for your dataset
run_pipeline(file_path)



[1;30;43mStreaming output truncated to the last 5000 lines.[0m
{'title': 'Rosario Tijeras', 'release_year': 2018, 'duration': '2 Seasons', 'type': 'TV Show'}
{'title': '1994', 'release_year': 2019, 'duration': '1 Season', 'type': 'TV Show'}
{'title': 'Born in Gaza', 'release_year': 2014, 'duration': '70 min', 'type': 'Movie'}
{'title': 'Born in Syria', 'release_year': 2016, 'duration': '87 min', 'type': 'Movie'}
{'title': 'Dying to Tell', 'release_year': 2018, 'duration': '88 min', 'type': 'Movie'}
{'title': "It's Bruno!", 'release_year': 2019, 'duration': '1 Season', 'type': 'TV Show'}
{'title': 'Maria', 'release_year': 2019, 'duration': '90 min', 'type': 'Movie'}
{'title': "ReMastered: The Lion's Share", 'release_year': 2019, 'duration': '84 min', 'type': 'Movie'}
{'title': 'Saverio Raimondo: Il Satiro Parlante', 'release_year': 2019, 'duration': '53 min', 'type': 'Movie'}
{'title': 'See You Yesterday', 'release_year': 2019, 'duration': '87 min', 'type': 'Movie'}
{'title': 'White G