# Stream using Kafka for Pipeline

##### Config

In [14]:
import configparser

# Load configuration
config = configparser.RawConfigParser()
config.read('../config/config.ini')

# Azure Event Hub Kafka settings
KAFKA_BROKER = config['AZURE_EVENTHUB']['BROKER']
EVENT_HUB_NAME = config['AZURE_EVENTHUB']['EVENT_HUB_NAME']
KAFKA_SASL_USERNAME = config['AZURE_EVENTHUB']['SASL_USERNAME']
KAFKA_SASL_PASSWORD = config['AZURE_EVENTHUB']['SASL_PASSWORD']

# Reddit API Settings
REDDIT_CLIENT_ID = config['REDDIT_API']['CLIENT_ID']
REDDIT_CLIENT_SECRET = config['REDDIT_API']['CLIENT_SECRET']
REDDIT_USER_AGENT = config['REDDIT_API']['USER_AGENT']
REDDIT_USERNAME = config['REDDIT_API']['USERNAME']
REDDIT_PASSWORD = config['REDDIT_API']['PASSWORD']

print("Config loaded successfully!")

Config loaded successfully!


In [34]:
import praw
from kafka import KafkaProducer
import json
import time
import pandas as pd
import praw
import pandas as pd
import re

In [35]:
# Reddit API credentials
reddit = praw.Reddit(
    client_id=REDDIT_CLIENT_ID,
    client_secret=REDDIT_CLIENT_SECRET,
    user_agent=REDDIT_USER_AGENT,
    redirect_uri="http://localhost",
)

# Set up Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    sasl_mechanism="PLAIN",
    security_protocol="SASL_SSL",
    sasl_plain_username=KAFKA_SASL_USERNAME,
    sasl_plain_password=KAFKA_SASL_PASSWORD,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data as JSON
)

print("Kafka Producer configured successfully!")


Kafka Producer configured successfully!


In [25]:
def stream_reddit_to_kafka(subreddit_name, num_posts=20):
    """
    Fetch Reddit posts from the specified subreddit and produce them to Kafka.

    :param subreddit_name: Name of the subreddit to stream from.
    :param num_posts: Number of posts to fetch in one iteration.
    """
    subreddit = reddit.subreddit(subreddit_name)
    print(f"Streaming posts from r/{subreddit_name}...")
    for submission in subreddit.stream.submissions():
        post_data = {
            "id": submission.id,
            "title": submission.title,
            "author": str(submission.author),
            "created_utc": submission.created_utc,
            "url": submission.url,
            "num_comments": submission.num_comments,
            "score": submission.score
        }

        # Send to Kafka
        try:
            producer.send(EVENT_HUB_NAME, value=post_data)
            print(f"Sent to Kafka: {post_data['title']}")
        except Exception as e:
            print(f"Failed to send message to Kafka: {e}")
        
        time.sleep(0.5)  # Sleep for a short duration to simulate a streaming flow


In [28]:
# Test streaming from a specific subreddit
try:
    stream_reddit_to_kafka(subreddit_name="malefashionadvice", num_posts=20)
except KeyboardInterrupt:
    print("Streaming stopped.")
except Exception as e:
    print(f"Error: {e}")



Streaming posts from r/malefashionadvice...


In [30]:
df['url'][0]

'https://www.reddit.com/r/malefashionadvice/comments/1hepsl5/shoes_with_jeans/'

Kafka Stream to drop into hive tables and Hadoop

In [39]:
def fetch_reddit_posts(subreddit_name, num_posts=20, num_comments=3):
    """
    Fetch posts from a subreddit, including images, galleries, subreddit section, and top comments.

    :param subreddit_name: Name of the subreddit to fetch data from.
    :param num_posts: Number of posts to fetch.
    :param num_comments: Number of top comments to fetch per post.
    :return: DataFrame with Reddit post details (title, content, images, gallery, URL, section, top comments).
    """
    subreddit = reddit.subreddit(subreddit_name)
    posts = []

    # Updated regex to include jpeg, jpg, png, gif, imgur links
    image_regex = r"(https?://(?:i\.)?imgur\.com/[a-zA-Z0-9]+(?:\.jpg|\.jpeg|\.png|\.gif)?|" \
                  r"https?://.*\.(jpg|jpeg|png|gif))"

    # Fetch posts from the 'hot' section
    for submission in subreddit.hot(limit=num_posts):
        post_images = []

        # Extract image URLs from the post URL
        if submission.url:
            if re.search(image_regex, submission.url):
                post_images.append(submission.url)

        # Extract image links from the post's selftext
        if submission.selftext:
            images_in_text = re.findall(image_regex, submission.selftext)
            post_images.extend([img[0] for img in images_in_text])

        # Extract gallery images if the post contains a gallery
        if hasattr(submission, "gallery_data") and submission.gallery_data:
            media_metadata = submission.media_metadata
            for item in submission.gallery_data["items"]:
                media_id = item["media_id"]
                if media_id in media_metadata:
                    img_url = media_metadata[media_id]["s"]["u"]
                    post_images.append(img_url)

        # Remove duplicates in image links
        post_images = list(set(post_images))

        # Fetch top comments
        top_comments = []
        submission.comment_sort = "best"
        submission.comments.replace_more(limit=0)  # Remove "more comments" placeholders
        for comment in submission.comments[:num_comments]:
            if comment.body:
                top_comments.append(comment.body.strip())

        # Store post details
        post_data = {
            "title": submission.title,
            "content": submission.selftext,
            "images": ", ".join(post_images) if post_images else None,
            "url": submission.url,
            "section": "hot",
            "created_utc": submission.created_utc,
            "top_comments": " | ".join(top_comments) if top_comments else None
        }
        posts.append(post_data)

    # Convert to a DataFrame
    df = pd.DataFrame(posts)
    return df


In [40]:
import os
import requests
from PIL import Image
import torch.nn.functional as F
from torchvision import models, transforms
import torch
import pandas as pd

# Define the transform for preprocessing

transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize image to match model input size

    transforms.ToTensor(),          # Convert image to tensor

    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],  # Normalization mean (ImageNet values)

        std=[0.229, 0.224, 0.225]    # Normalization std (ImageNet values)

    )
])


# Assume the model is already loaded
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = models.resnet18(pretrained=False)
model.fc = torch.nn.Linear(model.fc.in_features, 45)  # Adjust for the number of categories
model.load_state_dict(torch.load("../ImageProcessingTool/notebooks/resnet18_fashion.pth", map_location=device))
model = model.to(device)
model.eval()  # Set to evaluation mode


# Preprocessing function (assume it's defined)
def preprocess_image(image_path):
    image = Image.open(image_path).convert("RGB")  # Ensure 3-channel RGB
    image = transform(image)
    return image.unsqueeze(0)  # Add batch dimension

# Prediction function
def predict_with_confidence(image_path, model, categories):
    # Preprocess the image
    image = preprocess_image(image_path).to(device)
    
    # Perform inference
    with torch.no_grad():
        outputs = model(image)
        probabilities = F.softmax(outputs, dim=1)  # Convert logits to probabilities
        confidence, predicted = torch.max(probabilities, 1)  # Get max confidence and class index
    
    return categories[predicted.item()], confidence.item()

# Helper function to download an image from a URL
def download_image(url, save_dir="images"):
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    try:
        response = requests.get(url, stream=True)
        response.raise_for_status()
        filename = os.path.join(save_dir, url.split("/")[-1])
        with open(filename, "wb") as file:
            file.write(response.content)
        return filename
    except Exception as e:
        print(f"Failed to download {url}: {e}")
        return None

# Categories for prediction
categories = ['Topwear', 'Flip Flops', 'Water Bottle', 'Bath and Body', 'Shoes', 'Watches', 'Lips', 'Scarves', 
              'Innerwear', 'Wristbands', 'Skin Care', 'Ties', 'Umbrellas', 'Headwear', 'Loungewear and Nightwear', 
              'Beauty Accessories', 'Eyewear', 'Mufflers', 'Free Gifts', 'Nails', 'Bottomwear', 'Wallets', 'Saree', 
              'Dress', 'Cufflinks', 'Home Furnishing', 'Vouchers', 'Jewellery', 'Apparel Set', 'Perfumes', 'Makeup', 
              'Belts', 'Fragrance', 'Skin', 'Sports Accessories', 'Socks', 'Bags', 'Eyes', 'Shoe Accessories', 
              'Sandal', 'Gloves', 'Stoles', 'Accessories', 'Hair', 'Sports Equipment']

# Process images in the DataFrame
def process_images_in_dataframe(df, image_column="images"):
    processed_results = []

    for index, row in df.iterrows():
        image_urls = row[image_column]
        if pd.isna(image_urls):
            processed_results.append(None)
            continue

        predictions = []
        for url in image_urls.split(", "):  # Handle multiple image URLs
            image_path = download_image(url)
            if image_path:
                try:
                    predicted_category, confidence = predict_with_confidence(image_path, model, categories)
                    predictions.append(f"{predicted_category} ({confidence:.2f})")
                except Exception as e:
                    print(f"Error processing image {image_path}: {e}")
            else:
                predictions.append(None)

        # Combine predictions for all images in the row
        processed_results.append(", ".join(filter(None, predictions)))

    # Add the predictions to a new column in the DataFrame
    df["images_processed"] = processed_results
    return df


  model.load_state_dict(torch.load("../ImageProcessingTool/notebooks/resnet18_fashion.pth", map_location=device))


Stream Data to Kafka

In [111]:
from kafka import KafkaProducer
import json
import pandas as pd

# Kafka producer configuration
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    sasl_mechanism="PLAIN",
    security_protocol="SASL_SSL",
    sasl_plain_username=KAFKA_SASL_USERNAME,
    sasl_plain_password=KAFKA_SASL_PASSWORD,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize data as JSON
)

# Function to send processed DataFrame rows to Kafka
def send_to_kafka(df, topic=EVENT_HUB_NAME):
    for index, row in df.iterrows():
        message = {
            "title": row["title"],
            "content": row["content"],
            "images_processed": row["images_processed"],
            "url": row["url"],
            "section": row["section"],
            "created_utc": row["created_utc"],
            "top_comments": row["top_comments"]
        }
        producer.send(topic, value=message)
        print(f"Sent to Kafka: {message}")

# Fetch and process Reddit data
subreddit_name = "femalefashionadvice"
num_posts = 100
num_comments = 3

reddit_df = fetch_reddit_posts(subreddit_name, num_posts, num_comments)
reddit_df = process_images_in_dataframe(reddit_df, image_column="images")
send_to_kafka(reddit_df)

2024-12-30 22:53:37,228 - INFO - <BrokerConnection node_id=bootstrap-0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <connecting> [IPv4 ('52.191.45.161', 9093)]>: connecting to FashionTrendsEventHubs.servicebus.windows.net:9093 [('52.191.45.161', 9093) IPv4]
2024-12-30 22:53:37,230 - INFO - Probing node bootstrap-0 broker version
2024-12-30 22:53:37,264 - INFO - <BrokerConnection node_id=bootstrap-0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <handshake> [IPv4 ('52.191.45.161', 9093)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile='/opt/homebrew/etc/openssl@3/cert.pem', capath='/opt/homebrew/etc/openssl@3/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/opt/homebrew/etc/openssl@3/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/opt/homebrew/etc/openssl@3/certs')
2024-12-30 22:53:37,384 - INFO - <BrokerConnection node_id=bootstrap-0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <authenticating> [IPv4 ('52.191.45.

Failed to download https://i.imgur.com/YEkdxEH.png: 429 Client Error: Unknown Error for url: https://i.imgur.com/YEkdxEH.png
Sent to Kafka: {'title': 'Fashion news - December 30, 2024', 'content': 'Here you can share all the fashion related news you have come across recently. Whether it is a newly released runway, an interview with a designer, a new label slashing onto the scene or speculating new trends emerging, everything news related is welcome here.', 'images_processed': None, 'url': 'https://www.reddit.com/r/femalefashionadvice/comments/1hpkwbo/fashion_news_december_30_2024/', 'section': 'hot', 'created_utc': 1735556500.0, 'top_comments': None}
Sent to Kafka: {'title': 'If you are celebrating NYE, what are you planning to wear? I am especially interested to hear what those celebrating in cold weather will wear!', 'content': 'New Year\'s Eve is my favorite holiday. I love that it\'s global, is typically celebrated with friends rather than family, and that it\'s a chance to wear so

2024-12-30 22:54:43,148 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <connecting> [IPv4 ('52.191.45.161', 9093)]>: connecting to FashionTrendsEventHubs.servicebus.windows.net:9093 [('52.191.45.161', 9093) IPv4]
2024-12-30 22:54:43,199 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <handshake> [IPv4 ('52.191.45.161', 9093)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile='/opt/homebrew/etc/openssl@3/cert.pem', capath='/opt/homebrew/etc/openssl@3/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/opt/homebrew/etc/openssl@3/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/opt/homebrew/etc/openssl@3/certs')


2024-12-30 22:54:43,319 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <authenticating> [IPv4 ('52.191.45.161', 9093)]>: Authenticated as $ConnectionString via PLAIN
2024-12-30 22:54:43,320 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <authenticating> [IPv4 ('52.191.45.161', 9093)]>: Connection complete.
2024-12-30 22:54:43,320 - INFO - <BrokerConnection node_id=bootstrap-0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <connected> [IPv4 ('52.191.45.161', 9093)]>: Closing connection. 


Kafka Consumer to Store Data in HDFS


In [113]:
from hdfs import InsecureClient

# HDFS configuration
HDFS_URL = "http://localhost:9870"  # Replace with your HDFS NameNode address
hdfs_client = InsecureClient(HDFS_URL, user="root")  # Replace 'root' with your HDFS username

# Check HDFS connection by listing the root directory
try:
    files = hdfs_client.list("/")
    print("Connection successful. Files in HDFS root directory:")
    print(files)
except Exception as e:
    print(f"Failed to connect to HDFS: {e}")


2024-12-30 22:54:58,526 - INFO - Instantiated <InsecureClient(url='http://localhost:9870')>.
2024-12-30 22:54:58,529 - INFO - Listing '/'.


Connection successful. Files in HDFS root directory:
['reddit_data', 'rmstate', 'tmp', 'user']


In [114]:
from kafka import KafkaConsumer
import json

# Kafka consumer configuration
consumer = KafkaConsumer(
    EVENT_HUB_NAME,
    bootstrap_servers=KAFKA_BROKER,
    sasl_mechanism="PLAIN",
    security_protocol="SASL_SSL",
    sasl_plain_username=KAFKA_SASL_USERNAME,
    sasl_plain_password=KAFKA_SASL_PASSWORD,
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    fetch_min_bytes=1,  # Minimum amount of data (in bytes) to fetch
    fetch_max_wait_ms=100,  # Maximum wait time (in ms) before responding
    heartbeat_interval_ms=3000,  # Interval for heartbeats (keep consumer alive)
    auto_offset_reset='earliest',  # Start from the earliest message
    enable_auto_commit=True  # Commit offsets automatically
)

# Print messages from the Kafka topic
try:
    print("Listening for messages...")
    for message in consumer:
        print(f"Received message: {message.value}")
except KeyboardInterrupt:
    print("Stopped listening.")
except Exception as e:
    print(f"Error: {e}")
finally:
    consumer.close()


2024-12-30 22:55:03,671 - INFO - <BrokerConnection node_id=bootstrap-0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <connecting> [IPv4 ('52.191.45.161', 9093)]>: connecting to FashionTrendsEventHubs.servicebus.windows.net:9093 [('52.191.45.161', 9093) IPv4]
2024-12-30 22:55:03,749 - INFO - Probing node bootstrap-0 broker version
2024-12-30 22:55:03,776 - INFO - <BrokerConnection node_id=bootstrap-0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <handshake> [IPv4 ('52.191.45.161', 9093)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile='/opt/homebrew/etc/openssl@3/cert.pem', capath='/opt/homebrew/etc/openssl@3/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/opt/homebrew/etc/openssl@3/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/opt/homebrew/etc/openssl@3/certs')
2024-12-30 22:55:03,939 - INFO - <BrokerConnection node_id=bootstrap-0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <authenticating> [IPv4 ('52.191.45.

Listening for messages...


2024-12-30 22:55:04,630 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <authenticating> [IPv4 ('52.191.45.161', 9093)]>: Authenticated as $ConnectionString via PLAIN
2024-12-30 22:55:04,632 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <authenticating> [IPv4 ('52.191.45.161', 9093)]>: Connection complete.
2024-12-30 22:55:04,633 - INFO - <BrokerConnection node_id=bootstrap-0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <connected> [IPv4 ('52.191.45.161', 9093)]>: Closing connection. 


Received message: {'title': 'Vibe shift: heading towards the end of quiet luxury?!', 'content': 'It seems like Who What Wear is predicting a vibe shift away from quiet luxury and towards maximalism, personalized style, and boho for spring / summer: [article](https://www.whowhatwear.com/fashion/trends/state-of-style-2024).\n\nHow do you feel about the shift?\n\nIf you are planning to incorporate any of the more maximal style trends, how are you planning to work them in?', 'images_processed': None, 'url': 'https://www.reddit.com/r/femalefashionadvice/comments/1hp6lu1/vibe_shift_heading_towards_the_end_of_quiet_luxury/', 'section': 'hot', 'created_utc': 1735508346.0, 'top_comments': "That would tie in with my hypothesis that recession pop is coming back. But I'm not a scientist | You’re always going to have people wearing “quiet luxury” because that’s just their personal style, but happy to see more colorful, eclectic style celebrated too...although I agree with another commenter here tha

2024-12-30 22:55:08,887 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <connected> [IPv4 ('52.191.45.161', 9093)]>: Closing connection. 
2024-12-30 22:55:08,890 - ERROR - Fetch to node 0 failed: Cancelled: <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <connected> [IPv4 ('52.191.45.161', 9093)]>


Stopped listening.


docker exec -it namenode hdfs dfs -touchz /reddit_data/posts.json


In [115]:
from kafka import KafkaConsumer
from hdfs import InsecureClient
import json

# HDFS configuration
HDFS_URL = "http://localhost:9870"
HDFS_DIR = "/reddit_data"
hdfs_client = InsecureClient(HDFS_URL, user="root")
files = hdfs_client.list(HDFS_DIR)
print("Connected to HDFS successfully!")
print(files)

# Kafka consumer configuration
consumer = KafkaConsumer(
    EVENT_HUB_NAME,
    bootstrap_servers=KAFKA_BROKER,
    sasl_mechanism="PLAIN",
    security_protocol="SASL_SSL",
    sasl_plain_username=KAFKA_SASL_USERNAME,
    sasl_plain_password=KAFKA_SASL_PASSWORD,
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    fetch_min_bytes=1,  # Minimum amount of data (in bytes) to fetch
    fetch_max_wait_ms=100,  # Maximum wait time (in ms) before responding
    heartbeat_interval_ms=3000,  # Interval for heartbeats (keep consumer alive)
    auto_offset_reset='earliest',  # Start from the earliest message
    enable_auto_commit=True  # Commit offsets automatically
)

print ("Kafka Consumer configured successfully!")
print(consumer)
# Consume messages and write to HDFS
for message in consumer:
    print("Consuming message...")
    print(message.value)
    post_data = message.value
    print(f"Received from Kafka: {post_data}")

    # Write data to HDFS
    with hdfs_client.write(f"{HDFS_DIR}/posts.json", append=True, encoding='utf-8') as writer:
        writer.write(json.dumps(post_data) + "\n")


2024-12-30 22:55:12,086 - INFO - Instantiated <InsecureClient(url='http://localhost:9870')>.
2024-12-30 22:55:12,088 - INFO - Listing '/reddit_data'.
2024-12-30 22:55:12,322 - INFO - Fetching status for '/reddit_data'.
2024-12-30 22:55:12,397 - INFO - <BrokerConnection node_id=bootstrap-0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <connecting> [IPv4 ('52.191.45.161', 9093)]>: connecting to FashionTrendsEventHubs.servicebus.windows.net:9093 [('52.191.45.161', 9093) IPv4]
2024-12-30 22:55:12,398 - INFO - Probing node bootstrap-0 broker version
2024-12-30 22:55:12,425 - INFO - <BrokerConnection node_id=bootstrap-0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <handshake> [IPv4 ('52.191.45.161', 9093)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile='/opt/homebrew/etc/openssl@3/cert.pem', capath='/opt/homebrew/etc/openssl@3/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/opt/homebrew/etc/openssl@3/cert.pem', openssl_capath_env='SSL_CERT_D

Connected to HDFS successfully!
['posts.json']


2024-12-30 22:55:12,841 - INFO - Broker version identified as 2.2.0
2024-12-30 22:55:12,841 - INFO - Set configuration api_version=(2, 2, 0) to skip auto check_version requests on startup
2024-12-30 22:55:12,844 - INFO - Updating subscribed topics to: ('fashiontweets',)
2024-12-30 22:55:12,870 - INFO - Updated partition assignment: [TopicPartition(topic='fashiontweets', partition=0), TopicPartition(topic='fashiontweets', partition=1)]
2024-12-30 22:55:12,873 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <connecting> [IPv4 ('52.191.45.161', 9093)]>: connecting to FashionTrendsEventHubs.servicebus.windows.net:9093 [('52.191.45.161', 9093) IPv4]
2024-12-30 22:55:12,979 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <handshake> [IPv4 ('52.191.45.161', 9093)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile='/opt/homebrew/etc/openssl@3/cert.pem', capath='/opt/homebrew/etc/openssl@3/ce

Kafka Consumer configured successfully!
<kafka.consumer.group.KafkaConsumer object at 0x169b414c0>


2024-12-30 22:55:13,329 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <authenticating> [IPv4 ('52.191.45.161', 9093)]>: Authenticated as $ConnectionString via PLAIN
2024-12-30 22:55:13,329 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <authenticating> [IPv4 ('52.191.45.161', 9093)]>: Connection complete.
2024-12-30 22:55:13,329 - INFO - <BrokerConnection node_id=bootstrap-0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <connected> [IPv4 ('52.191.45.161', 9093)]>: Closing connection. 
2024-12-30 22:55:13,973 - INFO - Appending to '/reddit_data/posts.json'.


Consuming message...
{'title': 'Fashion news - December 30, 2024', 'content': 'Here you can share all the fashion related news you have come across recently. Whether it is a newly released runway, an interview with a designer, a new label slashing onto the scene or speculating new trends emerging, everything news related is welcome here.', 'images_processed': None, 'url': 'https://www.reddit.com/r/femalefashionadvice/comments/1hpkwbo/fashion_news_december_30_2024/', 'section': 'hot', 'created_utc': 1735556500.0, 'top_comments': None}
Received from Kafka: {'title': 'Fashion news - December 30, 2024', 'content': 'Here you can share all the fashion related news you have come across recently. Whether it is a newly released runway, an interview with a designer, a new label slashing onto the scene or speculating new trends emerging, everything news related is welcome here.', 'images_processed': None, 'url': 'https://www.reddit.com/r/femalefashionadvice/comments/1hpkwbo/fashion_news_december_

2024-12-30 22:55:14,155 - ERROR - Exception in child.
Traceback (most recent call last):
  File "/Users/Param/Desktop/Coding/Projects/FashionTrendsProject/Fashion-Trends-Sentiment-Analysis-and-Forecasting/venv/lib/python3.12/site-packages/urllib3/connection.py", line 199, in _new_conn
    sock = connection.create_connection(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/Param/Desktop/Coding/Projects/FashionTrendsProject/Fashion-Trends-Sentiment-Analysis-and-Forecasting/venv/lib/python3.12/site-packages/urllib3/util/connection.py", line 60, in create_connection
    for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/socket.py", line 976, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

ConnectionError: HTTPConnectionPool(host='5e78466d6542', port=9864): Max retries exceeded with url: /webhdfs/v1/reddit_data/posts.json?op=APPEND&user.name=root&namenoderpcaddress=namenode:9000&user.name=root (Caused by NameResolutionError("<urllib3.connection.HTTPConnection object at 0x169ce6f00>: Failed to resolve '5e78466d6542' ([Errno 8] nodename nor servname provided, or not known)"))

In [None]:
import json
import logging
from kafka import KafkaConsumer
from hdfs import InsecureClient
from hdfs.util import HdfsError

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("consumer.log"),
        logging.StreamHandler()
    ]
)

# HDFS configuration
HDFS_URL = "http://localhost:9870"  # Adjust if needed
HDFS_DIR = "/reddit_data"
HDFS_FILE = f"{HDFS_DIR}/posts.json"
hdfs_client = InsecureClient(HDFS_URL, user="root")

# Ensure HDFS directory exists
def ensure_hdfs_path():
    try:
        if not hdfs_client.status(HDFS_DIR, strict=False):
            hdfs_client.makedirs(HDFS_DIR)
            logging.info(f"Created HDFS directory: {HDFS_DIR}")
    except HdfsError as e:
        logging.error(f"HDFS Error: {e}")
        raise

# Write to HDFS with lease conflict resolution
def write_to_hdfs(data):
    try:
        if not hdfs_client.status(HDFS_FILE, strict=False):
            logging.info(f"File {HDFS_FILE} does not exist. Creating it...")
            with hdfs_client.write(HDFS_FILE, encoding='utf-8') as writer:
                writer.write(data + '\n')
        else:
            with hdfs_client.write(HDFS_FILE, encoding='utf-8', append=True) as writer:
                writer.write(data + '\n')
        logging.info(f"Written to HDFS file: {HDFS_FILE}")
    except HdfsError as e:
        if "File is currently under construction" in str(e) or "lease" in str(e):
            logging.warning(f"Lease conflict detected for {HDFS_FILE}. Attempting recovery.")
            recover_lease(HDFS_FILE)
            write_to_hdfs(data)  # Retry after recovering lease
        else:
            logging.error(f"HDFS Error while writing to file: {e}")

# Recover lease on HDFS file
def recover_lease(file_path):
    try:
        hdfs_client.recoverLease(file_path)
        logging.info(f"Lease recovered for {file_path}")
    except Exception as e:
        logging.error(f"Failed to recover lease for {file_path}: {e}")

# Kafka consumer configuration
consumer = KafkaConsumer(
    EVENT_HUB_NAME,
    bootstrap_servers=KAFKA_BROKER,
    sasl_mechanism="PLAIN",
    security_protocol="SASL_SSL",
    sasl_plain_username=KAFKA_SASL_USERNAME,
    sasl_plain_password=KAFKA_SASL_PASSWORD,
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True
)

# Main process with batching
def main():
    logging.info("Starting Kafka Consumer to HDFS...")
    ensure_hdfs_path()
    buffer = []

    try:
        for message in consumer:
            post_data = message.value
            logging.info(f"Received message: {json.dumps(post_data, indent=2)}")
            buffer.append(json.dumps(post_data))  # Add to buffer

            # Write in batches
            if len(buffer) >= 10:  # Adjust batch size as needed
                batch_write_to_hdfs(buffer)
                buffer.clear()  # Clear buffer after writing

    except KeyboardInterrupt:
        logging.info("Kafka Consumer stopped by user.")
    except Exception as e:
        logging.error(f"Unexpected error: {e}")
    finally:
        # Write remaining buffer to HDFS
        if buffer:
            batch_write_to_hdfs(buffer)
        consumer.close()
        logging.info("Kafka Consumer closed.")

# Batch write to HDFS
def batch_write_to_hdfs(buffer):
    try:
        data = "\n".join(buffer) + "\n"  # Prepare batch data
        write_to_hdfs(data)
    except Exception as e:
        logging.error(f"Failed to write batch to HDFS: {e}")

if __name__ == "__main__":
    main()


2024-12-30 22:55:33,378 - INFO - Instantiated <InsecureClient(url='http://localhost:9870')>.
2024-12-30 22:55:33,437 - INFO - <BrokerConnection node_id=bootstrap-0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <connecting> [IPv4 ('52.191.45.161', 9093)]>: connecting to FashionTrendsEventHubs.servicebus.windows.net:9093 [('52.191.45.161', 9093) IPv4]
2024-12-30 22:55:33,437 - INFO - Probing node bootstrap-0 broker version
2024-12-30 22:55:33,466 - INFO - <BrokerConnection node_id=bootstrap-0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <handshake> [IPv4 ('52.191.45.161', 9093)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile='/opt/homebrew/etc/openssl@3/cert.pem', capath='/opt/homebrew/etc/openssl@3/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/opt/homebrew/etc/openssl@3/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/opt/homebrew/etc/openssl@3/certs')
2024-12-30 22:55:33,576 - INFO - <BrokerConnection node_id=bootstrap-

2024-12-30 23:36:27,804 - ERROR - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <connected> [IPv4 ('52.191.45.161', 9093)]>: socket disconnected
2024-12-30 23:36:27,862 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <connected> [IPv4 ('52.191.45.161', 9093)]>: Closing connection. KafkaConnectionError: socket disconnected
2024-12-30 23:36:28,165 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <connecting> [IPv4 ('52.191.45.161', 9093)]>: connecting to FashionTrendsEventHubs.servicebus.windows.net:9093 [('52.191.45.161', 9093) IPv4]
2024-12-30 23:36:28,394 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <authenticating> [IPv4 ('52.191.45.161', 9093)]>: Authenticated as $ConnectionString via PLAIN
2024-12-30 23:36:28,395 - INFO - <BrokerConnection node_id=0 host=FashionTrendsEventHubs.servicebus.windows.net:9093 <auth