In [40]:
data_file_path = "finance_complaint/finance_artifact/data_preprocessing/20220906_130727/complaint_data"

In [7]:
from finance_complaint.entity.spark_manager import spark_session

In [None]:
spark_session

In [3]:
df = spark_session.read.parquet(data_file_path)

In [4]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

In [5]:
ONE_HOT_FEATURES = ['company_response','consumer_consent_provided','submitted_via','timely']

NUMERICAL_FEATURE = ['diff_in_days']


FREQUENCY_ENCODER = ['company','issue','product','state','zip_code']

#TOEKNIZER = ['complaint_what_happened']


TARGET_FEATURE = ['consumer_disputed']

In [6]:
indexer = StringIndexer(inputCols=ONE_HOT_FEATURES,outputCols=[f"idx_{feature}" for feature in ONE_HOT_FEATURES])

In [7]:
from pyspark.ml.feature import Imputer


In [8]:
from pyspark.sql.functions import col

In [9]:
from pyspark.sql import DataFrame

In [10]:
from pyspark.sql.functions import desc

In [11]:
from typing import List

In [12]:
def get_top_category(dataframe:DataFrame,columns:List[str]):
    top_category=dict()
    for column in columns:
        top_cat = df.groupBy(column).count().sort(desc('count')).take(1)[0]
        print(top_cat)
        category = df.groupBy(column).count().sort(desc('count')).take(1)[0][column]
        
        top_category[column]=category

    return top_category



In [97]:
get_top_category(df,ONE_HOT_FEATURES)

Row(company_response='Closed with explanation', count=580728)
Row(consumer_consent_provided='N/A', count=470178)
Row(submitted_via='Web', count=524238)
Row(timely='Yes', count=748267)


{'company_response': 'Closed with explanation',
 'consumer_consent_provided': 'N/A',
 'submitted_via': 'Web',
 'timely': 'Yes'}

In [99]:
df.filter(col('zip_code').isNull()).count()

5660

In [14]:
for col in df.columns:
    setattr(df,col,col)

In [101]:
df = df.na.fill(get_top_category(df,ONE_HOT_FEATURES))

Row(company_response='Closed with explanation', count=580728)
Row(consumer_consent_provided='N/A', count=470178)
Row(submitted_via='Web', count=524238)
Row(timely='Yes', count=748267)


In [16]:
df.columns

['company',
 'company_response',
 'complaint_what_happened',
 'consumer_consent_provided',
 'consumer_disputed',
 'issue',
 'product',
 'state',
 'submitted_via',
 'timely',
 'zip_code',
 'diff_in_days']

In [None]:
.agg({"diff_in_days":"sum"}).show()

In [30]:
df.printSchema()

root
 |-- company: string (nullable = true)
 |-- company_response: string (nullable = true)
 |-- complaint_what_happened: string (nullable = true)
 |-- consumer_consent_provided: string (nullable = true)
 |-- consumer_disputed: string (nullable = true)
 |-- issue: string (nullable = true)
 |-- product: string (nullable = true)
 |-- state: string (nullable = true)
 |-- submitted_via: string (nullable = true)
 |-- timely: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- diff_in_days: double (nullable = true)



In [39]:
df.filter(f"{df.diff_in_days} is  null").count()

769598

In [None]:
df.groupBy(['zip_code']).count().filter('zip_code is not null').sort(desc('count')).show()

In [65]:
from pyspark.ml.base import Transformer

In [None]:
class StandardScaler(Transformer):

    def __init__(self,*args,**kwargs) -> None:
        super().__init__(kwargs)
        

    

In [66]:
from typing import overload

In [1]:

from finance_complaint.ml.feature import StandardScaler
    

In [9]:
from pyspark.ml.feature import VectorAssembler

In [11]:
df

DataFrame[id: bigint, text: string, label: double, v_labest: vector]

In [10]:
df=vec_model = VectorAssembler().setInputCols(["label"]).setOutputCol("v_labest").transform(df)

In [None]:


# $example on$

scaler = StandardScaler(inputCol="v_labest", outputCol="scaled_label",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(df)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(df)
scaledData.show()
# $example off$


In [88]:
df

DataFrame[id: bigint, text: string, label: double]

In [91]:


encoder = OneHotEncoder(inputCol="label",
                        outputCol = "categoryVec2")
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()

+---+----------------+-----+-------------+
| id|            text|label| categoryVec2|
+---+----------------+-----+-------------+
|  0| a b c d e spark|  1.0|    (1,[],[])|
|  1|             b d|  0.0|(1,[0],[1.0])|
|  2|     spark f g h|  1.0|    (1,[],[])|
|  3|hadoop mapreduce|  0.0|(1,[0],[1.0])|
+---+----------------+-----+-------------+



In [15]:
df.show()

+---+----------------+-----+--------+
| id|            text|label|v_labest|
+---+----------------+-----+--------+
|  0| a b c d e spark|  1.0|   [1.0]|
|  1|             b d|  0.0|   [0.0]|
|  2|     spark f g h|  1.0|   [1.0]|
|  3|hadoop mapreduce|  0.0|   [0.0]|
+---+----------------+-----+--------+

