In [1]:
import pandas as pd

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, QuantileDiscretizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [48]:
df = spark.createDataFrame([
    ('AC1', 20220601, 'A'),
    ('AC1', 20220603, 'B'),
    ('AC1', 20220602, 'E'),
    ('AC1', 20220607, 'D'),
    ('AC1', 20220605, 'A'),
    ('AC2', 20220608, 'H'),
    ('AC2', 20220601, 'I'),
    ('AC2', 20220602, 'L'),
    ('AC2', 20220603, 'J'),
    ('AC2', 20220609, 'O'),
],['AC', 'dt', 'funcid'])

In [49]:
df.show()

+---+--------+------+
| AC|      dt|funcid|
+---+--------+------+
|AC1|20220601|     A|
|AC1|20220603|     B|
|AC1|20220602|     E|
|AC1|20220607|     D|
|AC1|20220605|     A|
|AC2|20220608|     H|
|AC2|20220601|     I|
|AC2|20220602|     L|
|AC2|20220603|     J|
|AC2|20220609|     O|
+---+--------+------+



In [50]:
from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window.partitionBy('AC').orderBy('dt')

sorted_list_df = df.withColumn(
            'sorted_list', F.collect_list('funcid').over(w)
        )\
        .groupBy('AC')\
        .agg(F.max('sorted_list').alias('sorted_list'))

In [51]:
sorted_list_df.show()

+---+---------------+
| AC|    sorted_list|
+---+---------------+
|AC1|[A, E, B, A, D]|
|AC2|[I, L, J, H, O]|
+---+---------------+



In [61]:
sorted_list_df = sorted_list_df.withColumn('tmp', F.concat_ws('', F.col('sorted_list')))

In [62]:
sorted_list_df.show()

+---+---------------+-----+
| AC|    sorted_list|  tmp|
+---+---------------+-----+
|AC1|[A, E, B, A, D]|AEBAD|
|AC2|[I, L, J, H, O]|ILJHO|
+---+---------------+-----+



In [63]:
pdf = sorted_list_df.toPandas()

In [64]:
pdf

Unnamed: 0,AC,sorted_list,tmp
0,AC1,"[A, E, B, A, D]",AEBAD
1,AC2,"[I, L, J, H, O]",ILJHO


In [89]:
def n_gram(target, n=3):
  # 基準を1文字(単語)ずつ ずらしながらn文字分抜き出す
  return [target[idx:idx + n] for idx in range(len(target) - n + 1)]

In [93]:
import numpy as np
list_seq = np.array([])
for i in range(len(pdf)):
    funcid_seq = n_gram(pdf.loc[i, 'tmp'], 3)
    list_seq = np.append(list_seq, funcid_seq)

In [94]:
df_sequence = pd.DataFrame(data=list_seq, columns=['sequence'])

In [95]:
df_sequence

Unnamed: 0,sequence
0,AEB
1,EBA
2,BAD
3,ILJ
4,LJH
5,JHO


In [101]:
df_tmp = pd.DataFrame(df_sequence['sequence'].value_counts()).reset_index().rename(columns={'index': 'heyyo'})

In [102]:
df_tmp

Unnamed: 0,heyyo,sequence
0,AEB,1
1,EBA,1
2,BAD,1
3,ILJ,1
4,LJH,1
5,JHO,1


In [105]:
df_tmp['ratio'] = df_tmp['sequence'] / df_tmp['sequence'].sum()

In [106]:
df_tmp

Unnamed: 0,heyyo,sequence,ratio
0,AEB,1,0.166667
1,EBA,1,0.166667
2,BAD,1,0.166667
3,ILJ,1,0.166667
4,LJH,1,0.166667
5,JHO,1,0.166667


In [36]:
import re

def remove_symbol(text):
    text = str(text)
    code_regex = re.compile(r'[[],]')
    cleaned_text = code_regex.sub(' ', cleaned_text)
    return cleaned_text

In [37]:
pdf['tmp'] = pdf['sorted_list'].apply(remove_symbol)

UnboundLocalError: local variable 'cleaned_text' referenced before assignment

In [24]:
pdf.dtypes

AC             object
sorted_list    object
dtype: object