In [1]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from modules import extract_, compress_, load_
import pandas as pd

In [2]:
spark = SparkSession.builder.appName('Plan_B').getOrCreate()

In [3]:
# Plan_B : transfrom_flat 함수코드 수정
# 함수내부에서 session을 열고, api 요청으로 데이터 받아오는 기능 수행
# 함수는 pandas DF 를 반환

def transform_df(date):
    '''
    날짜를 입력하면 그 날짜의 모든 거래를 품목별 법인별로 집계하여\n
    단일 json 데이터로 반환하는 함수
    '''
    import os
    import math
    import json
    import pandas as pd
    from dotenv import load_dotenv
    load_dotenv()

    api_id = os.getenv('garak_id')
    api_pw = os.getenv('garak_passwd')
    url = 'http://www.garak.co.kr/publicdata/dataOpen.do?'

    bubin_list = ['11000101','11000102','11000103','11000104','11000105','11000106']
    pummok_list = ['감귤','감자','건고추','고구마','단감','당근','딸기','마늘','무',
                    '미나리','바나나','배','배추','버섯','사과','상추','생고추','수박',
                    '시금치','양배추','양상추','양파','오이','참외','토마토','파',
                    '포도','피망','호박']

    dict1 = {'data': []}
    for pummok in pummok_list:
        dict2 = {f'{pummok}': []}
        for bubin in bubin_list:
            params = (
                    ('id', api_id),
                    ('passwd', api_pw),
                    ('dataid', 'data12'),
                    ('pagesize', '10'),
                    ('pageidx', '1'),
                    ('portal.templet', 'false'),
                    ('s_date', date),
                    ('s_bubin', bubin),
                    ('s_pummok', pummok),
                    ('s_sangi', '')
                    )
            dict3 = {f'{bubin}': []}
            list_total_count = int(extract_.extract(url, params)['lists']['list_total_count'])
            total_page = math.ceil(int(list_total_count) / 10)

            if int(list_total_count) != 0:
                for page in range(1, total_page+1):
                    params = (
                                ('id', api_id),
                                ('passwd', api_pw),
                                ('dataid', 'data12'),
                                ('pagesize', '10'),
                                ('pageidx', page),
                                ('portal.templet', 'false'),
                                ('s_date', date),
                                ('s_bubin', bubin),
                                ('s_pummok', pummok),
                                ('s_sangi', '')
                             )
                    html_dict = extract_.extract(url, params)
                    if list_total_count % 10 > 1:
                        for i in range(len(html_dict['lists']['list'])):
                            dict3[f'{bubin}'].append({
                                'idx' : ((page -1) * 10) + (i + 1),
                                'PUMMOK' : html_dict['lists']['list'][i]['PUMMOK'],
                                'PUMJONG' : html_dict['lists']['list'][i]['PUMJONG'],
                                'UUN' : html_dict['lists']['list'][i]['UUN'],
                                'DDD' : html_dict['lists']['list'][i]['DDD'],
                                'PPRICE' : html_dict['lists']['list'][i]['PPRICE'],
                                'SSANGI' : html_dict['lists']['list'][i]['SSANGI'],
                                'CORP_NM' : html_dict['lists']['list'][i]['CORP_NM'],
                                'ADJ_DT' : html_dict['lists']['list'][i]['ADJ_DT']
                                })
                    elif list_total_count % 10 == 1:
                        if list_total_count > 1:
                            for i in range(10):
                                dict3[f'{bubin}'].append({
                                    'idx' : ((page -1) * 10) + (i + 1),
                                    'PUMMOK' : html_dict['lists']['list'][i]['PUMMOK'],
                                    'PUMJONG' : html_dict['lists']['list'][i]['PUMJONG'],
                                    'UUN' : html_dict['lists']['list'][i]['UUN'],
                                    'DDD' : html_dict['lists']['list'][i]['DDD'],
                                    'PPRICE' : html_dict['lists']['list'][i]['PPRICE'],
                                    'SSANGI' : html_dict['lists']['list'][i]['SSANGI'],
                                    'CORP_NM' : html_dict['lists']['list'][i]['CORP_NM'],
                                    'ADJ_DT' : html_dict['lists']['list'][i]['ADJ_DT']
                                    })
                            list_total_count -= 10
                        elif list_total_count == 1:
                            dict3[f'{bubin}'].append({
                                'idx' : int(html_dict['lists']['list_total_count']),
                                'PUMMOK' : html_dict['lists']['list']['PUMMOK'],
                                'PUMJONG' : html_dict['lists']['list']['PUMJONG'],
                                'UUN' : html_dict['lists']['list']['UUN'],
                                'DDD' : html_dict['lists']['list']['DDD'],
                                'PPRICE' : html_dict['lists']['list']['PPRICE'],
                                'SSANGI' : html_dict['lists']['list']['SSANGI'],
                                'CORP_NM' : html_dict['lists']['list']['CORP_NM'],
                                'ADJ_DT' : html_dict['lists']['list']['ADJ_DT']
                                })
                dict2[f'{pummok}'].append(dict3)
            else:
                pass
        dict1['data'].append(dict2)

    flattened_data = []
    for item_data in dict1['data']:
        for item, bubin_list in item_data.items():
            for bubin_data in bubin_list:
                for bubin, transactions in bubin_data.items():
                    for transaction in transactions:
                        flattened_row = transaction.copy()
                        flattened_row['item'] = item
                        flattened_row['bubin'] = bubin
                        flattened_data.append(flattened_row)

    df = pd.DataFrame(flattened_data)

    return df

In [4]:
schema = StructType([
    StructField("idx", IntegerType(), True),
    StructField("PUMMOK", StringType(), True),
    StructField("PUMJONG", StringType(), True),
    StructField("UUN", StringType(), True),
    StructField("DDD", StringType(), True),
    StructField("PPRICE", StringType(), True),
    StructField("SSANGI", StringType(), True),
    StructField("CORP_NM", StringType(), True),
    StructField("ADJ_DT", StringType(), True),
    StructField("item", StringType(), True),
    StructField("bubin", StringType(), True),
])

transform_df_udf = udf(transform_df, schema)

In [6]:
data = [("2023-04-01",), ("2023-04-02",), ("2023-04-03",)]
columns = ["date"]
input_df = spark.createDataFrame(data, columns)

In [7]:
result_df = input_df.withColumn("df_of_date", transform_df_udf(input_df["date"]))

In [8]:
result_df

DataFrame[date: string, df_of_date: struct<idx:int,PUMMOK:string,PUMJONG:string,UUN:string,DDD:string,PPRICE:string,SSANGI:string,CORP_NM:string,ADJ_DT:string,item:string,bubin:string>]

In [9]:
result_df.show()

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\Users\wldnr\anaconda3\envs\cp2\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
  File "C:\Users\wldnr\anaconda3\envs\cp2\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process
  File "C:\Users\wldnr\anaconda3\envs\cp2\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "C:\Users\wldnr\anaconda3\envs\cp2\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "C:\Users\wldnr\anaconda3\envs\cp2\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 212, in _batched
    for item in iterator:
  File "C:\Users\wldnr\anaconda3\envs\cp2\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 450, in mapper
  File "C:\Users\wldnr\anaconda3\envs\cp2\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 450, in <genexpr>
  File "C:\Users\wldnr\anaconda3\envs\cp2\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 88, in <lambda>
  File "C:\Users\wldnr\anaconda3\envs\cp2\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\wldnr\AppData\Local\Temp\ipykernel_28284\4248806439.py", line 17, in transform_df
  File "c:\Users\wldnr\anaconda3\envs\cp2\lib\site-packages\dotenv\main.py", line 336, in load_dotenv
    dotenv_path = find_dotenv()
  File "c:\Users\wldnr\anaconda3\envs\cp2\lib\site-packages\dotenv\main.py", line 300, in find_dotenv
    for dirname in _walk_to_root(path):
  File "c:\Users\wldnr\anaconda3\envs\cp2\lib\site-packages\dotenv\main.py", line 257, in _walk_to_root
    raise IOError('Starting path not found')
OSError: Starting path not found
