# Consumer

In [24]:
from kafka import KafkaConsumer
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, StringType
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import to_timestamp

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("page_id", IntegerType(), True),
    StructField("url", StringType(), True),
    StructField("head", StringType(), True),
    StructField("author", StringType(), True),
    StructField("category", StringType(), True),
    StructField("date", StringType(), True),
    StructField("tags", StringType(), True),
    StructField("text", StringType(), True),
    StructField("summary", StringType(), True),
    StructField("hashtags", StringType(), True),
    StructField("keywords", StringType(), True),
])

# offset: subscribe from beginning of time
consumer = KafkaConsumer('news_from_step3', bootstrap_servers='127.0.0.1:9092', auto_offset_reset='earliest')


In [25]:
import pandas as pd

rows = []
while True:
    partition = ""
    for _, partition in consumer.poll(timeout_ms=5000, max_records=999999).items():
        for msg in partition:
            data = msg.value.decode('utf-8').split('\t')
            rows.append({
                "id": int(data[0]),
                "page_id": int(data[1]),
                "url": data[2],
                "head": data[3],
                "author": data[4],
                "category": data[5],
                "date": data[6],
                "tags": data[7],
                "text": data[8],
                "summary": data[9],
                "hashtags": data[10],
                "keywords": data[11],
            })
            print(f'\r{len(rows)}', end='')
    if len(partition) == 0:
        break

df = pd.DataFrame(rows)


1597

# Cassandra

In [26]:
import uuid
from cassandra.cqlengine import columns, connection
from cassandra.cqlengine.management import sync_table
from cassandra.cqlengine.models import Model
from cassandra.cqlengine.query import BatchQuery


In [27]:
from cassandra.cluster import Cluster

cluster = Cluster(['127.0.0.1'])
session = cluster.connect()
session.execute('DROP KEYSPACE IF EXISTS news;')
session.execute("CREATE KEYSPACE IF NOT EXISTS news WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};")
session.execute("CREATE KEYSPACE IF NOT EXISTS test_db WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};")
session.shutdown()
session = cluster.connect('news')

# prepared = session.prepare("INSERT INTO news.2023 (id, name) VALUES (?, ?)")
# session.execute(prepared, (1, 'John'))


## Model

In [28]:
class NewsModel(Model):
    id       = columns.UUID(primary_key=True, default=uuid.uuid4)
    url      = columns.Text()
    head     = columns.Text()
    author   = columns.Text()
    category = columns.Text()
    date     = columns.Text()
    tags     = columns.Text()
    text     = columns.Text()
    summary  = columns.Text()
    hashtags  = columns.Text()
    keywords  = columns.Text()

connection.setup(['127.0.0.1'], "news", protocol_version=3)
sync_table(NewsModel)




## Batch

In [29]:
from datetime import datetime
date_format = "%m/%d/%Y %H:%M:%S"

# df_list = df.collect()
df_list = df.to_dict(orient='records')
batch_max_size = 2
df_batches = [df_list[i:i+batch_max_size] for i in range(0, len(df_list), batch_max_size)]

for df_batch in df_batches:
    batch = BatchQuery()
    for row in df_batch:
        NewsModel.batch(batch).create(
            # date_time = datetime.strptime(row['Date/Time'], date_format),
            url = row['url'],
            head = row['head'],
            author = row['author'],
            category = row['category'],
            date = row['date'],
            tags = row['tags'],
            text = row['text'],
            summary = row['summary'],
            hashtags = row['hashtags'],
            keywords = row['keywords'],
        )
    batch.execute()


In [31]:
row

{'id': 39142,
 'page_id': 10990255,
 'url': 'https://www.tgju.org/news/2617083/آیا-طلا-تا-پایان-2023-می-درخشد',
 'head': 'آیا طلا تا پایان 2023 می\u200cدرخشد؟',
 'author': 'معصومه دانش',
 'category': 'اخبار طلا و سکه',
 'date': '1402-01-30',
 'tags': "['بهای طلا', 'پناهگاه امن', 'رکورد بی\\u200cسابقه', 'تورم', 'سیاست پولی', 'اقتصاد جهانی', 'جذابیت طلا', 'فرود سخت', 'رکود اقتصادی', 'مبارزه با تورم', 'محافظ تورمی', 'اخبار ویژه']",
 'text': 'شبکه اطلاع\u200cرسانی طلا ارز کیتکونیوز اولیه\u200cیطلاو دلار اونس بحران بانک ماه گذشته دلیل موجه سرمایه\u200cگذار مجاب مقدار طلا سبد سهام داشتهباشند بنا گفته شرکت سرمایه\u200cگذار فلز گرانبها پایان سال عنوان دارایی امن ارزش موقعیت باقی خواهدماند ایمارو کازانووا معاون مدیر وجو فاستر مدیر پورتفو استراتژیست صندوق طلا گزارش هفته گذشته منتشر قیمت\u200cها حمایت قوی بالا دلار اونس برخوردار ارزش طلا دید ادامه نوشت تحولات ماه گذشته عنوان زنگ خطر عمل حضور طلا روگردان نقطه ورود ابدأ دلهره\u200cآور علیرغم افزایش سطح ریسک ماه مارس طلا رکوردهای بی\u200cسابقه رسید 

In [30]:
df.columns

Index(['id', 'page_id', 'url', 'head', 'author', 'category', 'date', 'tags',
       'text', 'summary', 'hashtags', 'keywords'],
      dtype='object')

# Run ./manage.py sync_cassandra
