In [1]:
!pip install pyspark
!pip install apache-airflow



In [None]:
airflow initdb #Initialize the Airflow database:

In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import pandas as pd
import nltk
import psycopg2
from nltk.sentiment.vader import SentimentIntensityAnalyzer

# Download NLTK resources
nltk.download('vader_lexicon')

# Create a Spark session
spark = SparkSession.builder.appName("YouTubeSentimentAnalysis").getOrCreate()

def run_spark_job():
    # HDFS file path
    hdfs_file_path = 'hdfs://localhost/youtube_comments.csv'  # Replace with your HDFS file path

    # Read data from HDFS into a Spark DataFrame
    df = spark.read.csv(hdfs_file_path, header=True, inferSchema=True)

    # Create a UDF (User Defined Function) for sentiment analysis
    analyzer = SentimentIntensityAnalyzer()

    def analyze_sentiment(text):
        sentiment_scores = analyzer.polarity_scores(text)
        compound_score = sentiment_scores['compound']
        if compound_score > 0.05:
            return "Positive"
        elif compound_score < -0.05:
            return "Negative"
        else:
            return "Neutral"

    sentiment_udf = udf(analyze_sentiment, StringType())

    # Apply sentiment analysis UDF to the DataFrame
    df_with_sentiment = df.withColumn("Sentiment", sentiment_udf("cleaned_comment"))

    # Convert Spark DataFrame to Pandas DataFrame
    df_pandas = df_with_sentiment.toPandas()

    # Write the Pandas DataFrame to a CSV file
    csv_output_path = "/path/to/output21.csv"
    df_pandas.to_csv(csv_output_path, index=False)

    # Stop the Spark session
    spark.stop()

    # Send data to AWS Redshift
    redshift_connection_params = {
        'dbname': 'your_redshift_db',
        'user': 'redshift',
        'password': 'redshift',
        'host': 'your_redshift_host',
        'port': 'your_redshift_port'
    }

    # Establish connection to Redshift
    conn = psycopg2.connect(**redshift_connection_params)
    cursor = conn.cursor()

    # Create Redshift table if not exists
    create_table_query = """
    CREATE TABLE IF NOT EXISTS youtube_sentiment (
        -- Define your table schema based on your DataFrame columns
        id VARCHAR,
        comment_text VARCHAR,
        Sentiment VARCHAR(10)
    );
    """
    cursor.execute(create_table_query)
    conn.commit()

    # Copy data from CSV to Redshift
    copy_query = f"""
    COPY youtube_sentiment FROM '{csv_output_path}' 
    CREDENTIALS 'aws_access_key_id=your_access_key_id;aws_secret_access_key=your_secret_access_key'
    CSV;
    """
    cursor.execute(copy_query)
    conn.commit()

    # Close the connection
    cursor.close()
    conn.close()

    # Print a message
    print("Data loaded into AWS Redshift")

default_args = {
    'owner': 'spark_nlp_v1',
    'depends_on_past': False,
    'start_date': datetime(2023, 6, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'youtube_sentiment_analysis',
    default_args=default_args,
    description='DAG for YouTube Sentiment Analysis',
    schedule_interval='@weekly',  # Run every week on Monday
)

run_spark_task = PythonOperator(
    task_id='run_spark_job',
    python_callable=run_spark_job,
    dag=dag,
)

if __name__ == "__main__":
    dag.cli()


In [None]:
# Place both your Spark script and the Airflow DAG script in the same directory.

In [2]:
# Start the Airflow web server:

 airflow webserver -p 8080


In [3]:
# Start the Airflow scheduler in a separate terminal:

 airflow scheduler


In [None]:
#Access the Airflow web UI at http://localhost:8080 

In [None]:

This script sets up an Apache Airflow DAG for sentiment analysis on YouTube comments. 
It uses PySpark and NLTK's VADER for analysis, processes data from HDFS, and writes results to both CSV and AWS Redshift. 
The Airflow DAG is scheduled to run weekly.