# Pyspark의 ALS 활용하기
### https://techblog-history-younghunjo1.tistory.com/161 참고

In [None]:
import sklearn
import random

In [None]:
!apt-get install openjdk-8-jdk-headless

In [None]:
!wget -q https://downloads.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz

In [None]:
!tar -xvf spark-3.2.4-bin-hadoop3.2.tgz

In [None]:
!pip install findspark

In [None]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.4-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

In [None]:
import sklearn
import random

# Pysparkk Library #
# SQL
from pyspark import SparkConf, SparkContext

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
from pyspark.sql.types import *

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [None]:
# 스파크 세션 만들기
spark = SparkSession\
        .builder\
        .appName('TOPIK Mate ALS')\
        .getOrCreate()

In [None]:
import os
import pandas as pd
import numpy as np

In [None]:
# 사용자 데이터
users = pd.read_csv('/content/drive/MyDrive/캡스톤디자인_공유폴더/datas/contents/users.csv')

In [None]:
users.drop([0], axis=0, inplace=True)
users.sample(20)

In [None]:
# 문제 데이터
problems = pd.read_csv('/content/drive/MyDrive/캡스톤디자인_공유폴더/datas/contents/questions.csv')
problems.sample(20)

In [None]:
# 답안과 합치기 - pandas.merge()

solved = pd.merge(users, problems)
solved.head()

In [None]:
del users
del problems

In [None]:
solved['elapsed_time'] = pd.to_numeric(solved['elapsed_time'])
solved.head()

In [None]:
# 문제 ID 별 분류
questions = solved.groupby(solved.question_id)
questions.size()

In [None]:
del solved

In [None]:
result_df = pd.DataFrame()

# 그룹별 정/오답 그룹 정규 분포화
idx = 0
for key, group in questions:

    if (idx == 1):
        break;

    # print(f'[{key}] ============= ')

    # group = questions.get_group(key)

    # 정답 그룹
    correct_group = group[group['user_answer'] == group['correct_answer']]
    # 오답 그룹
    wrong_group = group[group['user_answer'] != group['correct_answer']]

    # print(f'group.shape: {group.shape}, correct_group.shape: {correct_group.shape}, wrong_group.shape: {wrong_group.shape}')

    pivot = 0
    # 정답 그룹 pivot 값
    if correct_group.size > 0:

        # N% 구간의 기준값 찾기
        pivot = np.percentile(correct_group['elapsed_time'], 15)
        try:
            correct_group.loc[correct_group['elapsed_time'] <= pivot, 'label'] = 1
            correct_group.loc[correct_group['elapsed_time'] > pivot, 'label'] = 2
        except ValueError as e:
            print('error: ', e)

        result_df = pd.concat([result_df, correct_group], axis=0)

    # print('correct_group: ', correct_group)

    # labeled1 = correct_group
    # labeled1['label'] = np.where(labeled1['elapsed_time'] <= pivot, 1, 2)


    pivot2 = 0
    # 오답 그룹 pivot 값
    if wrong_group.size > 0:
        pivot2 = np.percentile(wrong_group['elapsed_time'], 15)
        try:
            wrong_group.loc[wrong_group['elapsed_time'] <= pivot2, 'label'] = 5
            wrong_group.loc[wrong_group['elapsed_time'] > pivot2, 'label'] = 4
        except ValueError as e:
            print('error: ', e)
        result_df = pd.concat([result_df, wrong_group], axis=0)


    # print('wrong_group: ', wrong_group)
    # labeled2 = wrong_group
    # labeled2['label'] = np.where(labeled2['elapsed_time'] <= pivot2, 5, 4)


    # merge
    # temp = pd.concat([labeled1, labeled2], axis=0)
    # temp = pd.concat([correct_group, wrong_group], axis=0)

    idx += 1


In [None]:
del questions

In [None]:
result_df.sample(10)

In [None]:
# ALS 추천 알고리즘
from pyspark.ml.recommendation import ALS

In [None]:
# 스키마 정의
df_schema = StructType([StructField("user_id", StringType(), True)\
                        ,StructField("solving_id", StringType(), True)\
                        ,StructField("question_id", StringType(), True)\
                        ,StructField("user_answer", StringType(), True)\
                        ,StructField("elapsed_time", StringType(), True)\
                        ,StructField("bundle_id", StringType(), True)\
                        ,StructField("explanation_id", StringType(), True)\
                        ,StructField("correct_answer", StringType(), True)\
                        ,StructField("part", StringType(), True)\
                        ,StructField("tags", StringType(), True)\
                        ,StructField("deployed_at", LongType(), True)\
                        ,StructField("elapsed_time_num", IntegerType(), True)\
                        ,StructField("label", FloatType(), True)])

# Pandas -> Spark 변환
spark_df = spark.createDataFrame(result_df, schema=df_schema)
display(spark_df)

In [None]:
# 문자열인 user_id, question_id를 수치형 데이터로 바꾸기
stringIndexer = StringIndexer(inputCols=['user_id', 'question_id'],
                              outputCols=['user_id_num', 'question_id_num'])
encoded_df = stringIndexer.fit(spark_df)
encoded_df = transform(encoded_df)


In [None]:
encoded_df2.limit(5).show()

In [None]:
# 학습, 테스트 데이터 분리
train, test = encoded_df.randomSplit([0.75, 0.25])

In [None]:
rec = ALS(maxIter = 10,
          regParam = 0.01,
          userCol = 'user_id',
          itemCol = 'question_id',
          ratingCol = 'label',
          nonnegative = True,
          coldStartStrategy='drop')


# ALS 모델 학습 -> dataframe을 넣어주기
rec_model = rec.fit(train)

# trainsform을 이용해 예측 -> dataframe을 넣어주기
pred_labels = rec_model.transform(test)
pred_labels.limit(5).toPandas()