In [4]:
# %% [markdown]
# # 06) GH Archive 다운로드 및 Azure Blob Storage에 압축 해제하여 저장
# 
# - **워크플로우:**
#   1. 지정된 기간의 GH Archive 파일 URL 목록을 생성합니다.
#   2. Spark를 사용해 각 URL의 `.json.gz` 파일을 메모리로 다운로드합니다.
#   3. 다운로드와 동시에 메모리에서 압축을 해제합니다.
#   4. 압축 해제된 `.json` 데이터를 Azure Blob Storage의 지정된 컨테이너/디렉토리에 직접 업로드합니다.
# - **의존성:** 클러스터에 `requests`, `azure-storage-blob` 라이브러리가 설치되어 있어야 합니다.

# %%
# DBTITLE 1,Configuration
from datetime import datetime, timezone, timedelta
import os
from pyspark.sql import SparkSession

# ===================================================================
# <<<--- 수정된 부분: Spark 세션 초기화 코드 추가 --->>>
# ===================================================================
# 현재 Spark 세션을 가져오거나, 없으면 새로 생성합니다.
spark = SparkSession.builder.appName("DownloadAndExtractToAzure").getOrCreate()
# ===================================================================

# ----- 1. Azure Storage 정보 -----
AZURE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=mlstorage05team;AccountKey=JmCONXO33L/DymsDfzZzl6XYHmhZu+YQRFa8ISbFZc+GIqAh9dXE0j3ylBg0Vrl2d6BgShpBugFo+AStIaLFlQ==;EndpointSuffix=core.windows.net"
STORAGE_ACCOUNT_NAME = "mlstorage05team"
CONTAINER_NAME = "ghachive"
TARGET_DIR = "raw"

# ----- 2. 다운로드 기간 설정 -----
START_DATE = "2025-08-30"
END_DATE = "2025-08-31"

# %%
# DBTITLE 1,Connect to Azure Storage
# Spark 세션에 Azure Storage 계정 키를 설정하여 접근 권한 부여
try:
    account_key = AZURE_CONNECTION_STRING.split('AccountKey=')[1].split(';')[0]
    spark.conf.set(
        f"fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.blob.core.windows.net",
        account_key
    )
    print(f"Spark configuration for Azure Storage account '{STORAGE_ACCOUNT_NAME}' is set.")
except IndexError:
    raise ValueError("Could not parse AccountKey from the connection string.")

# %%
# DBTITLE 1,Generate Download & Upload Plan
def hour_urls(start_utc, end_utc):
    """지정된 기간 동안의 GHArchive URL을 생성합니다."""
    cur = start_utc.replace(minute=0, second=0, microsecond=0)
    end = end_utc.replace(minute=0, second=0, microsecond=0)
    while cur <= end:
        url = f"https://data.gharchive.org/{cur.strftime('%Y-%m-%d')}-{cur.hour}.json.gz"
        yield cur, url
        cur += timedelta(hours=1)

# 타임스탬프 생성
start_utc = datetime.strptime(START_DATE, "%Y-%m-%d").replace(tzinfo=timezone.utc)
end_utc = datetime.strptime(END_DATE, "%Y-%m-%d").replace(hour=23, minute=59, second=59, tzinfo=timezone.utc)

# 전체 작업 목록 생성 (소스 URL, 목적지 Blob 경로)
tasks = []
for hour_dt, url in hour_urls(start_utc, end_utc):
    blob_name = f"{TARGET_DIR}/{hour_dt.strftime('%Y-%m-%d')}-{hour_dt.hour}.json"
    tasks.append({"source_url": url, "dest_blob": blob_name})

tasks_df = spark.createDataFrame(tasks)

print(f"Total {tasks_df.count()} files will be processed.")
tasks_df.display()

# %%
# DBTITLE 1,Define Distributed ETL Function
from pyspark.sql.types import StructType, StructField, StringType
import pandas as pd

def process_and_upload(rows):
    """Spark 작업자(Worker) 노드에서 실행될 함수."""
    import requests
    import gzip
    from azure.storage.blob import BlobServiceClient

    connection_string = "DefaultEndpointsProtocol=https;AccountName=mlstorage05team;AccountKey=JmCONXO33L/DymsDfzZzl6XYHmhZu+YQRFa8ISbFZc+GIqAh9dXE0j3ylBg0Vrl2d6BgShpBugFo+AStIaLFlQ==;EndpointSuffix=core.windows.net"
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)
    container_client = blob_service_client.get_container_client("ghachive")
    
    results = []
    for row in rows:
        url = row["source_url"]
        blob_name = row["dest_blob"]
        status = "FAILED"
        try:
            response = requests.get(url, stream=True, timeout=120)
            response.raise_for_status()
            decompressed_data = gzip.decompress(response.content)
            container_client.upload_blob(name=blob_name, data=decompressed_data, overwrite=True)
            status = "SUCCESS"
        except Exception as e:
            status = f"FAILED: {str(e)}"
        
        results.append({"source_url": url, "status": status})
        
    return pd.DataFrame(results)

result_schema = StructType([
    StructField("source_url", StringType(), True),
    StructField("status", StringType(), True)
])

# %%
# DBTITLE 1,Execute Job and Check Results
results_df = tasks_df.mapInPandas(process_and_upload, schema=result_schema)
display(results_df)

# %%
# DBTITLE 1,Verify Uploaded Files on Azure
print(f"Verifying files in Azure container '{CONTAINER_NAME}/{TARGET_DIR}'...")
azure_path = f"wasbs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/{TARGET_DIR}"
try:
    uploaded_files = dbutils.fs.ls(azure_path)
    print(f"Verification successful. Found {len(uploaded_files)} files in the target directory.")
    for f in uploaded_files[:5]:
        print(f" - {f.path}")
except Exception as e:
    print(f"Could not verify files. Error: {e}")

ModuleNotFoundError: No module named 'pyspark'