In [16]:
import json
import requests
from kafka import KafkaProducer
from datetime import datetime
from langdetect import detect, LangDetectException

# Variables de configuration
GITHUB_TOKEN = "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"  # Remplacez par votre token GitHub
KAFKA_TOPIC = "github_repos"
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"

# Langues à filtrer (français et anglais)
ALLOWED_LANGUAGES = ["en"]  # 'en' pour anglais, 'fr' pour français

# Initialisation du producteur Kafka
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Fonction pour rechercher des dépôts GitHub créés dans un mois spécifique
def search_github_repos(month, year):
    query = f"created:{year}-{month:02d}-01..{year}-{month:02d}-30"
    url = f"https://api.github.com/search/repositories?q={query}&sort=created&order=desc&per_page=100"
    headers = {
        "Authorization": f"token {GITHUB_TOKEN}",
        "Accept": "application/vnd.github.v3+json"
    }

    repos = []
    while url:
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            data = response.json()
            repos.extend(data.get("items", []))
            # Vérifiez s'il existe une page suivante
            url = response.links.get('next', {}).get('url', None)
        else:
            print(f"Erreur lors de la récupération des dépôts : {response.status_code}")
            return []

    return repos

# Fonction pour détecter la langue du texte
def detect_language(text):
    try:
        return detect(text)
    except LangDetectException:
        return None

# Fonction pour filtrer les dépôts selon la langue naturelle (français ou anglais)
def filter_repos_by_language(repos):
    filtered_repos = []
    for repo in repos:
        description = repo.get("description", "")
        if description:  # Vérifier si la description n'est pas vide
            lang = detect_language(description)
            if lang in ALLOWED_LANGUAGES:
                filtered_repos.append(repo)
    return filtered_repos

# Fonction pour envoyer les noms des dépôts à Kafka
def send_to_kafka(repos):
    for repo in repos:
        try:
            repo_data = {
                "repo_name": repo["name"],
                "full_name": repo["full_name"],
                "html_url": repo["html_url"],
                "description": repo.get("description", ""),
                "created_at": repo.get("created_at", ""),
                "topics": repo.get("topics", []),
            }
            producer.send(KAFKA_TOPIC, value=repo_data)
            print(f"Dépôt envoyé à Kafka : {repo['name']}")
        except Exception as e:
            print(f"Erreur lors de l'envoi du dépôt {repo['name']} à Kafka: {str(e)}")

# Fonction principale pour récupérer et envoyer les dépôts
def collect_and_send(year):
    print(f"Recherche des dépôts GitHub créés en {year}...")

    # Parcours de chaque mois de l'année
    for month in range(1, 2):
        print(f"Recherche des dépôts GitHub créés en {month}/{year}...")
        repos = search_github_repos(month, year)
        
        if repos:
            # Filtrer les dépôts par langue naturelle
            filtered_repos = filter_repos_by_language(repos)
            if filtered_repos:
                send_to_kafka(filtered_repos)
            else:
                print(f"Aucun dépôt trouvé en anglais ou français pour {month}/{year}.")
        else:
            print(f"Aucun dépôt trouvé pour {month}/{year}.")


if __name__ == "__main__":
    try:
        # Exemple : récupérer les repos créés en 2024
        collect_and_send(2024)
    except KeyboardInterrupt:
        print("Arrêt du script.")
    finally:
        producer.close()


Recherche des dépôts GitHub créés en 2024...
Recherche des dépôts GitHub créés en 1/2024...
Dépôt envoyé à Kafka : GPT-SoVITS
Dépôt envoyé à Kafka : maybe
Dépôt envoyé à Kafka : windows
Dépôt envoyé à Kafka : fabric
Dépôt envoyé à Kafka : MiniCPM-o
Dépôt envoyé à Kafka : Scrapegraph-ai
Dépôt envoyé à Kafka : surya
Dépôt envoyé à Kafka : QAnything
Dépôt envoyé à Kafka : mihon
Dépôt envoyé à Kafka : pkl
Dépôt envoyé à Kafka : extensions
Dépôt envoyé à Kafka : lerobot
Dépôt envoyé à Kafka : sglang
Dépôt envoyé à Kafka : ebook2audiobook
Dépôt envoyé à Kafka : search_with_lepton
Dépôt envoyé à Kafka : Depth-Anything
Dépôt envoyé à Kafka : google-indexing-script
Dépôt envoyé à Kafka : MiniCPM
Dépôt envoyé à Kafka : earthworm
Dépôt envoyé à Kafka : BPB-Worker-Panel
Dépôt envoyé à Kafka : UFO
Dépôt envoyé à Kafka : agentscope
Dépôt envoyé à Kafka : OOTDiffusion
Dépôt envoyé à Kafka : awesome-prompts
Dépôt envoyé à Kafka : crawlee-python
Dépôt envoyé à Kafka : awesome-behavioral-interviews
Dépô

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

# Création de la session Spark
spark = SparkSession \
    .builder \
    .appName("KafkaSparkConsumer") \
    .master("spark://spark:7077") \  # Connecter au master Spark du conteneur
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .getOrCreate()


# Définissez le schéma des données envoyées sur Kafka
schema = StructType([
    StructField("repo_name", StringType(), True),
    StructField("full_name", StringType(), True),
    StructField("html_url", StringType(), True),
    StructField("description", StringType(), True),
    StructField("created_at", StringType(), True),
    StructField("topics", ArrayType(StringType()), True)
])

# Configuration pour consommer les messages Kafka
kafka_bootstrap_servers = "kafka:9092"  # Référence au conteneur Kafka
kafka_topic = "github_repos"

# Lire le stream de Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .load()

# Convertir les valeurs en JSON structuré
df = df.selectExpr("CAST(value AS STRING)")

# Appliquer le schéma et extraire les colonnes
df_parsed = df.select(from_json(col("value"), schema).alias("data")).select("data.*")

# Afficher les résultats dans la console
query = df_parsed \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Attendre la fin du stream
query.awaitTermination()

IndentationError: unexpected indent (4270451573.py, line 10)