<a href="https://colab.research.google.com/github/ankitojha2705/Apache_Beam_data/blob/main/Apache_Beam_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Let's start with the initial Colab notebook setup for the D3.js visualizations:

In [None]:
import kagglehub

path = kagglehub.dataset_download("shivamb/netflix-shows")
print("Path to dataset files:", path)


Path to dataset files: /root/.cache/kagglehub/datasets/shivamb/netflix-shows/versions/5


In [None]:
import pandas as pd

# Load the dataset using the exact path from kagglehub
df = pd.read_csv('/root/.cache/kagglehub/datasets/shivamb/netflix-shows/versions/5/netflix_titles.csv')
print(df.head())  # Display the first few rows to confirm it loaded correctly


  show_id     type                  title         director  \
0      s1    Movie   Dick Johnson Is Dead  Kirsten Johnson   
1      s2  TV Show          Blood & Water              NaN   
2      s3  TV Show              Ganglands  Julien Leclercq   
3      s4  TV Show  Jailbirds New Orleans              NaN   
4      s5  TV Show           Kota Factory              NaN   

                                                cast        country  \
0                                                NaN  United States   
1  Ama Qamata, Khosi Ngema, Gail Mabalane, Thaban...   South Africa   
2  Sami Bouajila, Tracy Gotoas, Samuel Jouy, Nabi...            NaN   
3                                                NaN            NaN   
4  Mayur More, Jitendra Kumar, Ranjan Raj, Alam K...          India   

           date_added  release_year rating   duration  \
0  September 25, 2021          2020  PG-13     90 min   
1  September 24, 2021          2021  TV-MA  2 Seasons   
2  September 24, 2021        

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Data Preprocessing
def clean_and_prepare_data(df):
    df_cleaned = df.copy()

    # Convert date_added to datetime with error handling
    df_cleaned['date_added'] = pd.to_datetime(df_cleaned['date_added'],
                                            format='mixed',
                                            errors='coerce')

    # Extract year and month from date_added where not null
    df_cleaned['added_year'] = df_cleaned['date_added'].dt.year
    df_cleaned['added_month'] = df_cleaned['date_added'].dt.month

    # Clean duration field - corrected string operations
    duration_pattern = r'(\d+)\s*(\D+)'
    df_cleaned['duration_number'] = df_cleaned['duration'].astype(str).str.extract(duration_pattern)[0].astype(float)
    df_cleaned['duration_unit'] = df_cleaned['duration'].astype(str).str.extract(duration_pattern)[1].str.strip()

    # Fill NaN values in country
    df_cleaned['country'] = df_cleaned['country'].fillna('Unknown')

    return df_cleaned

# Basic Analysis Functions
def get_basic_stats(df):
    stats = {
        'Total Shows': len(df),
        'Movies': len(df[df['type'] == 'Movie']),
        'TV Shows': len(df[df['type'] == 'TV Show']),
        'Unique Countries': df['country'].nunique(),
        'Year Range': f"{df['release_year'].min()} - {df['release_year'].max()}",
        'Content Rating Range': sorted(df['rating'].dropna().unique().tolist())
    }
    return stats

# Visualization Functions
def plot_content_distribution(df):
    content_counts = df['type'].value_counts()
    fig = px.pie(values=content_counts.values,
                 names=content_counts.index,
                 title='Distribution of Content Type')
    return fig

def plot_yearly_additions(df):
    yearly_data = df[df['added_year'].notna()]
    yearly_additions = yearly_data.groupby('added_year').size().reset_index(name='count')
    fig = px.line(yearly_additions, x='added_year', y='count',
                  title='Content Added by Year',
                  labels={'added_year': 'Year', 'count': 'Number of Titles'})
    return fig

def plot_rating_distribution(df):
    rating_dist = df['rating'].value_counts()
    fig = px.bar(x=rating_dist.index, y=rating_dist.values,
                 title='Content Rating Distribution',
                 labels={'x': 'Rating', 'y': 'Count'})
    return fig

def plot_country_distribution(df):
    top_countries = df['country'].value_counts().head(10)
    fig = px.bar(x=top_countries.index, y=top_countries.values,
                 title='Top 10 Countries by Content Production',
                 labels={'x': 'Country', 'y': 'Number of Titles'})
    fig.update_xaxes(tickangle=45)
    return fig

def plot_duration_distribution(df):
    # Separate movies and TV shows and handle duration
    movies = df[df['type'] == 'Movie'].copy()
    tv_shows = df[df['type'] == 'TV Show'].copy()

    fig = go.Figure()

    # Add movie duration histogram
    movie_durations = movies['duration_number'].dropna()
    if len(movie_durations) > 0:
        fig.add_trace(go.Histogram(x=movie_durations,
                                  name='Movies (minutes)',
                                  opacity=0.75))

    # Add TV show seasons histogram
    tv_durations = tv_shows['duration_number'].dropna()
    if len(tv_durations) > 0:
        fig.add_trace(go.Histogram(x=tv_durations,
                                  name='TV Shows (seasons)',
                                  opacity=0.75))

    fig.update_layout(title='Duration Distribution',
                     barmode='overlay',
                     xaxis_title='Duration',
                     yaxis_title='Count')
    return fig

def generate_complete_eda(df):
    try:
        # Print initial data info for debugging
        print("Initial data info:")
        print(df.info())

        # Clean the data
        df_cleaned = clean_and_prepare_data(df)

        # Get basic statistics
        stats = get_basic_stats(df_cleaned)

        # Generate all plots
        content_dist = plot_content_distribution(df_cleaned)
        yearly_adds = plot_yearly_additions(df_cleaned)
        rating_dist = plot_rating_distribution(df_cleaned)
        country_dist = plot_country_distribution(df_cleaned)
        duration_dist = plot_duration_distribution(df_cleaned)

        return {
            'stats': stats,
            'plots': {
                'content_distribution': content_dist,
                'yearly_additions': yearly_adds,
                'rating_distribution': rating_dist,
                'country_distribution': country_dist,
                'duration_distribution': duration_dist
            }
        }
    except Exception as e:
        print(f"Error during EDA generation: {str(e)}")
        print(f"Error type: {type(e)}")
        import traceback
        print(traceback.format_exc())
        raise

In [None]:
# First, let's check the data
print("Dataset shape:", df.shape)
print("\nDataset columns:", df.columns.tolist())
print("\nFirst few rows of duration column:")
print(df['duration'].head())

# Now run the analysis
results = generate_complete_eda(df)

# Display basic statistics
print("\nBasic Statistics:")
for key, value in results['stats'].items():
    print(f"{key}: {value}")

# Display all plots
for plot_name, plot in results['plots'].items():
    plot.show()

Dataset shape: (8807, 12)

Dataset columns: ['show_id', 'type', 'title', 'director', 'cast', 'country', 'date_added', 'release_year', 'rating', 'duration', 'listed_in', 'description']

First few rows of duration column:
0       90 min
1    2 Seasons
2     1 Season
3     1 Season
4    2 Seasons
Name: duration, dtype: object
Initial data info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8807 entries, 0 to 8806
Data columns (total 12 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   show_id       8807 non-null   object
 1   type          8807 non-null   object
 2   title         8807 non-null   object
 3   director      6173 non-null   object
 4   cast          7982 non-null   object
 5   country       7976 non-null   object
 6   date_added    8797 non-null   object
 7   release_year  8807 non-null   int64 
 8   rating        8803 non-null   object
 9   duration      8804 non-null   object
 10  listed_in     8807 non-null   object
 11

# Advanced Netflix Shows Analysis

In [None]:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.figure_factory as ff

def clean_dates(df):
    # Clean date_added column by removing leading/trailing spaces
    df['date_added'] = df['date_added'].str.strip()
    # Convert to datetime with mixed format
    return pd.to_datetime(df['date_added'], format='mixed', errors='coerce')

def analyze_content_trends(df):
    # Create a copy and clean dates
    df = df.copy()
    df['year'] = clean_dates(df).dt.year

    # Remove NaT values
    df = df.dropna(subset=['year'])
    yearly_by_type = df.groupby(['year', 'type']).size().unstack(fill_value=0)

    fig = go.Figure()
    for content_type in yearly_by_type.columns:
        fig.add_trace(go.Scatter(
            x=yearly_by_type.index,
            y=yearly_by_type[content_type],
            name=content_type,
            mode='lines+markers'
        ))

    fig.update_layout(
        title='Content Addition Trends by Type Over Time',
        xaxis_title='Year',
        yaxis_title='Number of Titles Added',
        hovermode='x unified'
    )
    return fig

def analyze_ratings_by_type(df):
    rating_type = pd.crosstab(df['rating'], df['type'])

    fig = go.Figure()
    for content_type in rating_type.columns:
        fig.add_trace(go.Bar(
            name=content_type,
            x=rating_type.index,
            y=rating_type[content_type]
        ))

    fig.update_layout(
        barmode='group',
        title='Content Ratings Distribution by Type',
        xaxis_title='Rating',
        yaxis_title='Count'
    )
    return fig

def create_genre_analysis(df):
    # Handle potential NaN values in listed_in
    genres = df['listed_in'].fillna('Unknown').str.split(',', expand=True).stack()
    genres = genres.str.strip()
    top_genres = genres.value_counts().head(15)

    fig = px.bar(
        x=top_genres.index,
        y=top_genres.values,
        title='Top 15 Genres on Netflix',
        labels={'x': 'Genre', 'y': 'Number of Titles'}
    )
    fig.update_xaxes(tickangle=45)
    return fig

def analyze_release_patterns(df):
    # Clean and convert dates
    df = df.copy()
    df['date_added'] = clean_dates(df)
    df['month_added'] = df['date_added'].dt.month

    # Remove NaN values
    monthly_additions = df.dropna(subset=['month_added']).groupby('month_added').size()

    # Create month labels
    month_labels = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
                   'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']

    fig = px.line(
        x=month_labels,
        y=monthly_additions.values,
        title='Content Addition Patterns by Month',
        labels={'x': 'Month', 'y': 'Number of Titles Added'}
    )
    return fig

def analyze_content_maturity(df):
    # Create a heatmap of content ratings by type and year
    df = df.copy()
    df['year'] = clean_dates(df).dt.year

    # Remove NaN values
    df = df.dropna(subset=['year', 'rating'])

    # Create cross-tabulation
    rating_year = pd.crosstab(df['year'], df['rating'])

    fig = px.imshow(
        rating_year,
        title='Content Ratings Distribution Over Time',
        labels=dict(x='Rating', y='Year', color='Number of Titles')
    )
    return fig

def generate_advanced_analysis(df):
    try:
        print("Starting advanced analysis...")
        print("Shape of dataset:", df.shape)
        print("Columns:", df.columns.tolist())

        # Create all visualizations
        trends_plot = analyze_content_trends(df)
        ratings_plot = analyze_ratings_by_type(df)
        genre_plot = create_genre_analysis(df)
        seasonal_plot = analyze_release_patterns(df)
        maturity_plot = analyze_content_maturity(df)

        return {
            'content_trends': trends_plot,
            'ratings_distribution': ratings_plot,
            'genre_analysis': genre_plot,
            'seasonal_patterns': seasonal_plot,
            'maturity_distribution': maturity_plot
        }
    except Exception as e:
        print(f"Error in advanced analysis: {str(e)}")
        print("Data sample:")
        print(df['date_added'].head())
        raise

# Additional analysis function
def analyze_descriptions(df):
    # Handle potential NaN values in descriptions
    df['description_word_count'] = df['description'].fillna('').str.split().str.len()

    fig = px.histogram(
        df,
        x='description_word_count',
        color='type',
        title='Distribution of Description Lengths by Content Type',
        labels={'description_word_count': 'Number of Words', 'count': 'Frequency'},
        nbins=50
    )
    return fig

In [None]:
# First, let's check the date format in your dataset
print("Sample of date_added column:")
print(df['date_added'].head(10))
print("\nDate column info:")
print(df['date_added'].describe())

# Run the advanced analysis
advanced_results = generate_advanced_analysis(df)

# Display all new plots
for plot_name, plot in advanced_results.items():
    print(f"\nDisplaying {plot_name}")
    plot.show()

# Add description analysis
print("\nGenerating description analysis...")
description_plot = analyze_descriptions(df)
description_plot.show()

# Print additional insights
print("\nContent Distribution Insights:")
print(f"Total number of titles: {len(df)}")
print(f"Movies: {len(df[df['type'] == 'Movie'])}")
print(f"TV Shows: {len(df[df['type'] == 'TV Show'])}")

Sample of date_added column:
0    September 25, 2021
1    September 24, 2021
2    September 24, 2021
3    September 24, 2021
4    September 24, 2021
5    September 24, 2021
6    September 24, 2021
7    September 24, 2021
8    September 24, 2021
9    September 24, 2021
Name: date_added, dtype: object

Date column info:
count                8797
unique               1767
top       January 1, 2020
freq                  109
Name: date_added, dtype: object
Starting advanced analysis...
Shape of dataset: (8807, 12)
Columns: ['show_id', 'type', 'title', 'director', 'cast', 'country', 'date_added', 'release_year', 'rating', 'duration', 'listed_in', 'description']

Displaying content_trends



Displaying ratings_distribution



Displaying genre_analysis



Displaying seasonal_patterns



Displaying maturity_distribution



Generating description analysis...



Content Distribution Insights:
Total number of titles: 8807
Movies: 6131
TV Shows: 2676


# b) Assignment 2 - Auto EDA with your favorite tool

- First, let's do the Auto EDA using Pandas Profiling (now called ydata-profiling):

In [None]:
# Install Apache Beam
!pip install apache-beam

!pip install ydata-profiling
# Install required packages
!pip install sweetviz

Collecting sweetviz
  Downloading sweetviz-2.3.1-py3-none-any.whl.metadata (24 kB)
Downloading sweetviz-2.3.1-py3-none-any.whl (15.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m15.1/15.1 MB[0m [31m18.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: sweetviz
Successfully installed sweetviz-2.3.1




## Netflix Auto EDA


## we need to load the dataset first

In [None]:
# Import required libraries and load the dataset
import kagglehub
import pandas as pd

# Download the dataset
path = kagglehub.dataset_download("shivamb/netflix-shows")
print("Path to dataset files:", path)

# Load the dataset using the exact path from kagglehub
df = pd.read_csv('/root/.cache/kagglehub/datasets/shivamb/netflix-shows/versions/5/netflix_titles.csv')

# Verify the data is loaded
print("\nDataset shape:", df.shape)
print("\nFirst few rows:")
print(df.head())

Path to dataset files: /root/.cache/kagglehub/datasets/shivamb/netflix-shows/versions/5

Dataset shape: (8807, 12)

First few rows:
  show_id     type                  title         director  \
0      s1    Movie   Dick Johnson Is Dead  Kirsten Johnson   
1      s2  TV Show          Blood & Water              NaN   
2      s3  TV Show              Ganglands  Julien Leclercq   
3      s4  TV Show  Jailbirds New Orleans              NaN   
4      s5  TV Show           Kota Factory              NaN   

                                                cast        country  \
0                                                NaN  United States   
1  Ama Qamata, Khosi Ngema, Gail Mabalane, Thaban...   South Africa   
2  Sami Bouajila, Tracy Gotoas, Samuel Jouy, Nabi...            NaN   
3                                                NaN            NaN   
4  Mayur More, Jitendra Kumar, Ranjan Raj, Alam K...          India   

           date_added  release_year rating   duration  \
0  Septembe

## Netflix Auto EDA

In [None]:
import pandas as pd
from ydata_profiling import ProfileReport
import matplotlib.pyplot as plt
import seaborn as sns

def generate_simple_eda(df):
    """
    Generate a simple automated EDA report
    """
    try:
        print("Starting Simple Auto EDA generation...")

        # 1. Generate Pandas Profiling Report
        print("\nGenerating YData Profiling Report...")
        profile = ProfileReport(df,
                              title="Netflix Content Analysis",
                              minimal=True)  # Using minimal=True for faster processing
        profile.to_file("netflix_profiling_report.html")
        print("✓ YData Profiling Report generated successfully")

        # 2. Generate Basic Visualizations
        print("\nGenerating Basic Visualizations...")

        # Content Type Distribution
        plt.figure(figsize=(10, 6))
        df['type'].value_counts().plot(kind='bar')
        plt.title('Distribution of Content Type')
        plt.tight_layout()
        plt.savefig('content_distribution.png')
        plt.close()

        # 3. Generate Quick Summary
        summary = {
            'Total Content': len(df),
            'Content Types': df['type'].value_counts().to_dict(),
            'Unique Countries': df['country'].nunique(),
            'Release Years': f"{df['release_year'].min()} - {df['release_year'].max()}",
            'Top Ratings': df['rating'].value_counts().head().to_dict()
        }

        return {
            'summary': summary,
            'files_generated': [
                'netflix_profiling_report.html',
                'content_distribution.png'
            ]
        }

    except Exception as e:
        print(f"Error in Auto EDA generation: {str(e)}")
        print("Error details:", str(e))
        raise

def print_eda_summary(results):
    """Print a formatted summary of the EDA results"""
    print("\n=== NETFLIX CONTENT ANALYSIS SUMMARY ===")
    print(f"\nTotal Content: {results['summary']['Total Content']}")

    print("\nContent Distribution:")
    for content_type, count in results['summary']['Content Types'].items():
        print(f"{content_type}: {count}")

    print(f"\nContent from {results['summary']['Unique Countries']} unique countries")
    print(f"Release Years: {results['summary']['Release Years']}")

    print("\nTop Content Ratings:")
    for rating, count in results['summary']['Top Ratings'].items():
        print(f"{rating}: {count}")

    print("\nGenerated Files:")
    for file in results['files_generated']:
        print(f"- {file}")

# Function to run the complete analysis
def run_simple_eda(df):
    """Run the complete EDA process"""
    print("Starting EDA process...")
    results = generate_simple_eda(df)
    print("\nGenerating summary...")
    print_eda_summary(results)
    return results

## Let's run the analysis:

In [None]:
# Run the simple Auto EDA
results = run_simple_eda(df)

Starting EDA process...
Starting Simple Auto EDA generation...

Generating YData Profiling Report...


Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

✓ YData Profiling Report generated successfully

Generating Basic Visualizations...

Generating summary...

=== NETFLIX CONTENT ANALYSIS SUMMARY ===

Total Content: 8807

Content Distribution:
Movie: 6131
TV Show: 2676

Content from 748 unique countries
Release Years: 1925 - 2021

Top Content Ratings:
TV-MA: 3207
TV-14: 2160
TV-PG: 863
R: 799
PG-13: 490

Generated Files:
- netflix_profiling_report.html
- content_distribution.png


# c) Assignment 3 - Apache beam features - demonstrate apache beam in a colab including

In [None]:
!pip install apache-beam



## Netflix Apache Beam Pipeline

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms import window
from apache_beam.transforms.trigger import AfterWatermark, AfterCount
from apache_beam.utils.timestamp import Timestamp
import datetime
import json
import csv

class ParseCSV(beam.DoFn):
    def process(self, element):
        try:
            reader = csv.reader([element])
            row = next(reader)

            if len(row) == 12:
                return [{
                    'show_id': row[0],
                    'type': row[1],
                    'title': row[2],
                    'director': row[3],
                    'cast': row[4],
                    'country': row[5],
                    'date_added': row[6],
                    'release_year': int(row[7]) if row[7].isdigit() else None,
                    'rating': row[8],
                    'duration': row[9],
                    'listed_in': row[10],
                    'description': row[11]
                }]
        except Exception as e:
            print(f"Error parsing CSV: {str(e)}")
        return []

class GenreAnalysis(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "Extract Genres" >> beam.FlatMap(
                lambda x: [(genre.strip(), 1) for genre in x['listed_in'].split(',')])
            | "Count Genres" >> beam.CombinePerKey(sum)
            | "Format Genre Results" >> beam.Map(
                lambda x: {'genre': x[0], 'count': x[1]}
            )
        )

class ContentAnalysis(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "Extract Types" >> beam.Map(lambda x: (x['type'], 1))
            | "Count Types" >> beam.CombinePerKey(sum)
            | "Format Content Results" >> beam.Map(
                lambda x: {'type': x[0], 'count': x[1]}
            )
        )

def run_streaming_pipeline(input_path, output_path):
    """Execute streaming pipeline"""
    # Set up pipeline options
    options = PipelineOptions()
    standard_options = options.view_as(StandardOptions)
    standard_options.streaming = True

    with beam.Pipeline(options=options) as p:
        # Read and parse data
        base_data = (
            p
            | "Read CSV" >> beam.io.ReadFromText(input_path, skip_header_lines=1)
            | "Parse CSV" >> beam.ParDo(ParseCSV())
        )

        # Genre analysis
        genre_results = (
            base_data
            | "Analyze Genres" >> GenreAnalysis()
            | "Format Genre Output" >> beam.Map(lambda x: json.dumps(x))
        )

        # Content type analysis
        type_results = (
            base_data
            | "Analyze Content" >> ContentAnalysis()
            | "Format Type Output" >> beam.Map(lambda x: json.dumps(x))
        )

        # Write results
        genre_results | "Write Genres" >> beam.io.WriteToText(
            f"{output_path}/genres",
            file_name_suffix='.jsonl'
        )

        type_results | "Write Types" >> beam.io.WriteToText(
            f"{output_path}/types",
            file_name_suffix='.jsonl'
        )

def process_streaming_results(output_path):
    """Process and display the results"""
    import glob

    def read_jsonl(pattern):
        results = []
        for file in glob.glob(pattern):
            with open(file, 'r') as f:
                for line in f:
                    if line.strip():
                        try:
                            results.append(json.loads(line))
                        except json.JSONDecodeError:
                            continue
        return results

    print("\nGenre Analysis Results:")
    genre_results = read_jsonl(f"{output_path}/genres*")
    for result in sorted(genre_results, key=lambda x: x['count'], reverse=True)[:10]:
        print(f"{result['genre']}: {result['count']}")

    print("\nContent Type Analysis Results:")
    type_results = read_jsonl(f"{output_path}/types*")
    for result in sorted(type_results, key=lambda x: x['count'], reverse=True):
        print(f"{result['type']}: {result['count']}")

## Let's run the pipeline:

In [None]:
# Define paths
input_path = '/root/.cache/kagglehub/datasets/shivamb/netflix-shows/versions/5/netflix_titles.csv'
output_path = './beam_results_streaming'

# Create output directory
!mkdir -p {output_path}

# Run the streaming pipeline
print("Starting streaming pipeline...")
run_streaming_pipeline(input_path, output_path)

# Process and display results
print("\nProcessing streaming results...")
process_streaming_results(output_path)

Starting streaming pipeline...





Processing streaming results...

Genre Analysis Results:
International Movies: 2752
Dramas: 2427
Comedies: 1674
International TV Shows: 1351
Documentaries: 868
Action & Adventure: 859
TV Dramas: 763
Independent Movies: 756
Children & Family Movies: 641
Romantic Movies: 616

Content Type Analysis Results:
Movie: 6130
TV Show: 2676
