In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
# Очистать директорию OutPUT
#!rm -rf /kaggle/working/*

In [None]:
import os, sys
import json
from datetime import datetime, date
from pydantic import BaseModel
from transformers import pipeline
import tiktoken

In [None]:
#!pip install loguru
from loguru import logger
logger.remove() # удаляет запись в консоль
# Добавляем лог-файл (enqueue=True включает неблокирующую очередь)
logger.add(sys.stdout)
logger.add("async_log.log", format="{time} | {level} | {message}", rotation="1 MB", enqueue=False)

In [None]:
# Обновление последней версии polars
#!pip install -U deltalake
import deltalake
#!pip install -U polars  1.32.
import polars as pl

In [31]:
# Подключение к Clouflare R2 S3 (необязательно)
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
ACCESS_KEY = user_secrets.get_secret("ACCESS_KEY")
SECRET_KEY = user_secrets.get_secret("SECRET_KEY")
ENDPOINT_URL = "https://9cf55f33e92c95201664f2c62ca31641.r2.cloudflarestorage.com/"
storage_options = {
    "AWS_ACCESS_KEY_ID": ACCESS_KEY,
    "AWS_SECRET_ACCESS_KEY": SECRET_KEY,
    "AWS_ENDPOINT_URL": ENDPOINT_URL,  # например, https://s3.us-east-1.wasabisys.com
    # Для MinIO/R2 безопасная конкуренция без DynamoDB:
    "aws_conditional_put": "etag"
}

In [None]:
# Константы
BATCH_SIZE = 25000
INPUT_DIR = '/kaggle/input/subreddit-btc/'
OUTPUT_DIR = '/tmp/stage'

class RedditItem(BaseModel):
    """Pydantic model for raw Reddit data"""
    selftext: str
    upvotes: int
    num_of_comments: int
    date_: date

class ProcessedItem(BaseModel):
    """Pydantic model for processed sentiment analysis results"""
    text: str
    upvotes: int
    numofcomms: int
    sentiment: float
    date_: date  

class SentimentHuggingFaceModel:
    def __init__(self):
        self.sentiment_model = pipeline(
                                "sentiment-analysis",
                                model="cardiffnlp/twitter-roberta-base-sentiment-latest",
                                batch_size=16
                                )

    def get_sentiment(self, selftext: str) -> float:
        """Вычисляет тональность текста"""
        try:
            selftext = selftext[:500]
            result = self.sentiment_model(selftext)
            label = result[0]["label"].lower()
            score = result[0]["score"]

            if label == "positive":
                return score
            elif label == "negative":
                return -score
            else:
                return 0
        except Exception as e:
            print(f"Ошибка API: {e}")
            return 0

In [None]:
# Функции с основной логиков обработки
def read_and_process_files(sentiment_model, input_dir, output_dir, batch_size):
    """Читает файлы и обрабатывает их батчами"""
    batch = []
    
    for filename in os.listdir(input_dir):
        file_path = os.path.join(input_dir, filename)
        if not os.path.isfile(file_path):
            continue
            
        with open(file_path, 'r', encoding='utf-8') as file:
            for line in file:
                try:
                    data = json.loads(line)
                    selftext = data.get('selftext', '')
                    
                    if not selftext.strip() or selftext.lower() in {"[deleted]", "[removed]"}:
                        continue
                        
                    # Create RedditItem instance
                    item = RedditItem(
                        selftext=selftext,
                        upvotes=data.get('score', 0),
                        num_of_comments=data.get('num_comments', 0),
                        date_=datetime.utcfromtimestamp(int(data.get('created_utc', 0) or 0)).date()
                    )
                    
                    batch.append(item)
                    
                    # Обрабатываем батч когда он заполнится
                    if len(batch) >= batch_size:
                        logger.info(f'Processing batch of {len(batch)} items')
                        process_batch(sentiment_model, batch, output_dir)
                        batch = []
                        
                except Exception as e:
                    logger.error(f'Read file error: {e}, file: {file}')
                    continue
    
    if batch:
        logger.info(f'Processing final batch of {len(batch)} items')
        try:
            process_batch(sentiment_model, batch, output_dir)
        except Exception as e:
            logger.error(f"Error processing final batch: {e}, file: {file}")

def process_batch(sentiment_model, batch, output_dir):
    """Обрабатывает один батч и сохраняет"""
    logger.info('Batch processing started')
    
    processed = []
    for item in batch:
        try:
            sentiment = sentiment_model.get_sentiment(item.selftext)
            
            processed.append(ProcessedItem(
                text=item.selftext,
                upvotes=item.upvotes,
                numofcomms=item.num_of_comments,
                sentiment=sentiment,
                date_=item.date_
            ))
        except Exception as e:
            logger.error(f'Processing item error: {e}')
            continue
            
    if processed:
        logger.info(f'Batch processing finished, saving {len(processed)} items')
        save_to_delta(processed, output_dir)

def save_to_delta(data, output_dir):
    """Сохраняет в Delta Lake"""
    try:
        # Convert ProcessedItem objects to dictionaries
        data_dicts = [item.model_dump() for item in data]
        df = pl.DataFrame(data_dicts)
        df.write_delta(output_dir, mode="append")
        logger.info('Delta table saved')
    except Exception as e:
        logger.error(f'Save delta error: {e}')

def set_sentiment():
    """Основная функция"""
    sentiment_model = SentimentHuggingFaceModel()
    read_and_process_files(sentiment_model, INPUT_DIR, OUTPUT_DIR, BATCH_SIZE)

In [None]:
# !!!!! ЗАПУСК РАСЧЕТОВ !!!!
set_sentiment()

In [None]:
# Просмотреть log-и
with open('/kaggle/working/async_log.log', 'r') as f:
    logs = f.read()
print(logs)


In [None]:
# Проверить количество строк в файле
with open('/kaggle/input/subreddit-btc/btc_part_aa') as f:
    line_count = sum(1 for _ in f)
    print(line_count)

In [None]:
# Прочитать содержимое delta table
DIR_ = '/kaggle/input/zip-outputs/'
#DIR_ = '/kaggle/working/sentiment_data'
df = pl.read_delta(DIR_)
df

In [32]:
# Записать во внешнюю S3 таблицу 
table_path = "s3://bucket01/sentiment_data/"
df.write_delta(table_path, storage_options=storage_options, mode="overwrite")

In [None]:
# copy from /tmp directory to Output (/kaggle/working)
!cp -r /tmp/stage/* /kaggle/working/
!cd /kaggle/working/ && zip -r ./output_files.zip .

In [None]:
# Код если потом захотим копировать файлы из локальных папок удаленно
import boto3
from botocore.exceptions import ClientError

def upload_to_s3(file_path, bucket_name, object_name=None):
    """
    Upload a file to an S3 bucket
    
    :param file_path: Path to file to upload
    :param bucket_name: Target S3 bucket name
    :param object_name: S3 object name (defaults to file name)
    :return: True if successful, False otherwise
    """
    
    # Initialize S3 client
    s3_client = boto3.client(
        's3',
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key
    )
    
    # Default object name to file name
    if object_name is None:
        object_name = os.path.basename(file_path)
    
    try:
        # Upload with public read access
        s3_client.upload_file(
            file_path,
            bucket_name,
            object_name,
            ExtraArgs={'ACL': 'public-read'}  # Remove if private upload needed
        )
        print(f"Successfully uploaded {file_path} to s3://{bucket_name}/{object_name}")
        return True
    except ClientError as e:
        print(f"Upload failed: {e}")
        return False
    except FileNotFoundError:
        print(f"File not found: {file_path}")
        return False

# Example usage:
# upload_to_s3('data.csv', 'my-s3-bucket', 'kaggle-data/data.csv')