# Exploratory Data Analysis

## Context

**Column Description**


| Column     | Type       | Description |
|--------  |---------  |: --------- |
| **BibNum** | Integer | Customer ID |
| **Title** | String | Whether the customer is a male or a female |
| **Author** | String | Whether the customer is a senior citizen or not (1, 0) |
| **ISBN** | String | Whether the customer has a partner or not (Yes, No) |
| **PublicationYear** | String | Whether the customer has dependents or not (Yes, No) |
| **Publisher** | String | Number of months the customer has stayed with the company |
| **Subjects** | String | Whether the customer has a phone service or not (Yes, No) |
| **ItemType** | String | Whether the customer has multiple lines or not (Yes, No, No phone service) |
| **ItemCollection** | String | Customer’s internet service provider (DSL, Fiber optic, No) |
| **FloatingItem** | String | Whether the customer has online security or not (Yes, No, No internet service) |
| **ItemLocation** | String | Whether the customer has online backup or not (Yes, No, No internet service) |
| **ReportDate** | String | Whether the customer has device protection or not (Yes, No, No internet service) |
| **ItemCount** | String | Whether the customer has tech support or not (Yes, No, No internet service) |



In [28]:
from pyspark.sql.types import *

customSchema = StructType([
  StructField("BibNum", IntegerType(), True),
  StructField("Title", StringType(), True),
  StructField("Author", StringType(), True),
  StructField("ISBN", StringType(), True),
  StructField("PublicationYear", StringType(), True),
  StructField("Publisher", StringType(), True),
  StructField("Subjects", StringType(), True),
  StructField("ItemType", StringType(), True),
  StructField("ItemCollection", StringType(), True),
  StructField("FloatingItem", StringType(), True),
  StructField("ItemLocation", StringType(), True),
  StructField("ReportDate", StringType(), True),
  StructField("ItemCount", StringType(), True)]
)

In [31]:
df = spark.read.format("csv")\
.option("header", "true")\
.schema(customSchema)\
.load("seattle-library-sample.csv")

In [32]:
df.dataframeName='seattle-library-sample'

In [43]:
df.cache()

DataFrame[BibNum: int, Title: string, Author: string, ISBN: string, PublicationYear: string, Publisher: string, Subjects: string, ItemType: string, ItemCollection: string, FloatingItem: string, ItemLocation: string, ReportDate: string, ItemCount: string]

In [103]:
print("Number of rows: " + str(df.count()))

Number of rows: 347484


In [45]:
cols = df.columns
print("Number of columns: " + str(len(cols)))

Number of columns: 13


In [104]:
df.printSchema()

root
 |-- BibNum: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Author: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- PublicationYear: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- Subjects: string (nullable = true)
 |-- ItemType: string (nullable = true)
 |-- ItemCollection: string (nullable = true)
 |-- FloatingItem: string (nullable = true)
 |-- ItemLocation: string (nullable = true)
 |-- ReportDate: string (nullable = true)
 |-- ItemCount: string (nullable = true)



In [107]:
#df.show(20,False)
#df.show()
df.show(n=2)

+------+--------------------+--------------------+----------+---------------+--------------------+--------------------+--------+--------------+------------+------------+--------------------+---------+
|BibNum|               Title|              Author|      ISBN|PublicationYear|           Publisher|            Subjects|ItemType|ItemCollection|FloatingItem|ItemLocation|          ReportDate|ItemCount|
+------+--------------------+--------------------+----------+---------------+--------------------+--------------------+--------+--------------+------------+------------+--------------------+---------+
|  6845|Autumn of glory; ...|Connelly, Thomas ...|0807104450|         [1971]|Louisiana State U...|Confederate State...|    acbk|          canf|          NA|         cen|2017-09-01T00:00:...|        1|
|  7118|Variety music cav...|Mattfeld, Julius,...|0139407189|         [1971]|       Prentice-Hall|Popular music Uni...|    arbk|         caref|          NA|         cen|2017-09-01T00:00:...|      

In [154]:
#df.filter(df.ItemCollection == "cchol").show(truncate=False)
#df.select("Subjects").where("ItemCollection"=="a")
#df.select("Subjects").filter("ItemCollection = 'cchol'").show()
df.select("Subjects").filter("ItemCollection = 'cchol'").show(truncate=False)

+--------------------+
|            Subjects|
+--------------------+
|[Folklore Austria...|
|[Moles Animals Ju...|
|[Christmas poetry...|
|[Groundhog Day Ju...|
|[Fear Juvenile fi...|
|[Mice Juvenile fi...|
|[Memorial Day Juv...|
|[Ballets Stories ...|
|[Christmas Fictio...|
|[Kwanzaa Juvenile...|
|[Lambchop Stanley...|
|[Ghosts Fiction, ...|
|[Snowmen Juvenile...|
|[Individuality Ju...|
|[Christmas decora...|
|[Valentines Day J...|
|[Easter Bunny Juv...|
|[Childrens songs ...|
|[Stories in rhyme...|
|[All Souls Day Me...|
+--------------------+
only showing top 20 rows



In [110]:
#df.show(truncate=False)
df.select("Subjects").show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Subjects                                                                                                                                                                                                        |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Confederate States of America Army Department of Tennessee, United States History Civil War 1861 1865 Regimental histories                                                                                      |
|Popular music United States Bibliography, Music United States Chronology                                                                                   

In [118]:
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql.functions import split

df=df.withColumn(
    "Subjects",
    split(col("Subjects"), ",\s*").cast(ArrayType(StringType())).alias("Subjects")
)

In [130]:
#df.printSchema()
#df.select("Subjects").show(truncate=False)
#show(n=1, truncate=False)
lista = df.select("Subjects").collect()

In [136]:
lista[0][0][0]

'Confederate States of America Army Department of Tennessee'

## Data cleansing / preparation

In [41]:
# checking nulls
df.dropna().count()

246464

In [71]:
#df.na.df.show()
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+------+-----+------+-----+---------------+---------+--------+--------+--------------+------------+------------+----------+---------+
|BibNum|Title|Author| ISBN|PublicationYear|Publisher|Subjects|ItemType|ItemCollection|FloatingItem|ItemLocation|ReportDate|ItemCount|
+------+-----+------+-----+---------------+---------+--------+--------+--------------+------------+------------+----------+---------+
|     0| 1843| 54965|80224|           4477|     2954|    8322|     125|           118|         114|         109|       111|      112|
+------+-----+------+-----+---------------+---------+--------+--------+--------------+------------+------------+----------+---------+



In [82]:
df.select([count(when(isnan('Subjects') | col('Subjects').isNull() , True))]).show()

+----------------------------------------------------------------------+
|count(CASE WHEN (isnan(Subjects) OR (Subjects IS NULL)) THEN true END)|
+----------------------------------------------------------------------+
|                                                                     0|
+----------------------------------------------------------------------+



In [72]:
df.where(col("Subjects").isNull()).count() # check null values

8322

In [73]:
df.where(col("ItemType").isNull()).count() # check null values

125

In [74]:
df.where(col("ItemCollection").isNull()).count() # check null values

118

In [75]:
df.count()

355806

In [76]:
df = df.na.drop(subset=["Subjects"]) ## Drop null values

In [77]:
df.where(col("Subjects").isNull()).count() # check null values

0

In [78]:
df.count() # check number of rows

347484

In [None]:
# columns
# Rever isto
cols_non_features = ["ReportDate","Publisher","PublicationYear","BibNum","ISBN",""]

## Analyzing

In [52]:
from pyspark.sql.functions import count
df.select(count("ItemType")).show()
df.select(count("ItemCollection")).show()

+---------------+
|count(ItemType)|
+---------------+
|         355681|
+---------------+

+---------------------+
|count(ItemCollection)|
+---------------------+
|               355688|
+---------------------+



In [53]:
from pyspark.sql.functions import countDistinct
df.select(countDistinct("ItemType")).show()
df.select(countDistinct("ItemCollection")).show()

+------------------------+
|count(DISTINCT ItemType)|
+------------------------+
|                     674|
+------------------------+

+------------------------------+
|count(DISTINCT ItemCollection)|
+------------------------------+
|                           594|
+------------------------------+



In [54]:
from pyspark.sql.functions import first, last
df.select(first("ItemType"), last("ItemType")).show()

+---------------+--------------+
|first(ItemType)|last(ItemType)|
+---------------+--------------+
|           acbk|          acbk|
+---------------+--------------+



In [55]:
from pyspark.sql.functions import collect_set, collect_list
df.agg(collect_set("ItemCollection"), collect_list("ItemCollection")).show(20,True)

+---------------------------+----------------------------+
|collect_set(ItemCollection)|collect_list(ItemCollection)|
+---------------------------+----------------------------+
|       [naesl, Elephants...|        [canf, canf, care...|
+---------------------------+----------------------------+



In [57]:
from pyspark.sql.functions import  collect_list
df.agg(collect_set("ItemCollection")).show(20,False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [58]:
from pyspark.sql.functions import  collect_list
df.agg(collect_list("ItemCollection")).show(1,True)

+----------------------------+
|collect_list(ItemCollection)|
+----------------------------+
|        [canf, canf, care...|
+----------------------------+



In [59]:
df.groupBy("Subjects", "ItemType", "ItemCollection").count().show()

+--------------------+--------+--------------+-----+
|            Subjects|ItemType|ItemCollection|count|
+--------------------+--------+--------------+-----+
|African American ...|    arbk|          cabr|    1|
|Sports cars Great...|    arbk|          cs7r|    2|
|Edmonds Wash History|    arbk|         caref|    2|
|American bison, W...|    arbk|          cs9r|    1|
|Pirates Juvenile ...|    jcbk|          ncef|    3|
|Architecture Comp...|    acbk|          canf|    1|
|Finance Personal ...|    jcbk|          ccnf|    1|
|Grunge music, Roc...|    accd|        naover|    3|
|Boston Tea Party ...|    jcbk|          ncnf|    3|
|Thought and think...|    acbk|          nanf|    2|
|Animals Fiction, ...|    jcbk|         ncpic|    6|
|Young women Ficti...|    acbk|       nalpfic|    1|
|Popular instrumen...|    accd|          cacd|    1|
|Fathers and sons ...|    acbk|         nafic|    3|
|Probabilities, Ch...|    acbk|          nanf|    1|
|Gorilla Juvenile ...|    jcbk|         ncpic|

In [64]:
from pyspark.sql.functions import count, expr
df.groupBy("ItemCollection").agg(
countDistinct("ItemType").alias("DistinctItemTypes"),
expr("count(ItemType)")).show()

+--------------------+-----------------+---------------+
|      ItemCollection|DistinctItemTypes|count(ItemType)|
+--------------------+-----------------+---------------+
|Feynman Richard P...|                1|              2|
|               cceck|                1|              3|
|               ccfft|                1|            493|
|       Kondo, Robert|                1|              1|
|               jcdvd|                1|              1|
|               1947.|                0|              0|
|     Philomel Books,|                1|              4|
|Drew Nancy Fictit...|                1|              1|
|    Adultery Fiction|                1|              1|
|              carefo|                1|            168|
|              cavalo|                2|             34|
|                nccd|                3|           1322|
|                cs6r|                6|           3990|
|Physical fitness,...|                1|              1|
|Private investiga...|         

## Feature Transformation

In [87]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
encoder = OneHotEncoder()\
.setInputCol("ItemType")\
.setOutputCol("ItemType")

In [89]:
#encoder.transform(df.select("ItemType")).show()
#encoder.show()

In [90]:
stringIndexer = StringIndexer(inputCol="ItemType", outputCol="ItemTypeIndex")

In [91]:
model = stringIndexer.fit(df)

In [111]:
indexed = model.transform(df)
#indexed.show()
#encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
#encoded = encoder.transform(indexed)
#encoded.show()
indexed.select("ItemType","ItemTypeIndex").show()

+--------+-------------+
|ItemType|ItemTypeIndex|
+--------+-------------+
|    acbk|          0.0|
|    arbk|          2.0|
|    arbk|          2.0|
|    acbk|          0.0|
|    arbk|          2.0|
|    acbk|          0.0|
|    acbk|          0.0|
|    arbk|          2.0|
|    acbk|          0.0|
|    arbk|          2.0|
|    jcbk|          1.0|
|    jcbk|          1.0|
|    arbk|          2.0|
|    arbk|          2.0|
|    arbk|          2.0|
|    arbk|          2.0|
|    acbk|          0.0|
|    arbk|          2.0|
|    acbk|          0.0|
|    acbk|          0.0|
+--------+-------------+
only showing top 20 rows



In [94]:
#replace_cols = [ 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection',
#                'TechSupport','StreamingTV', 'StreamingMovies']
#for i in replace_cols : 
#    df  = df.select(i).replace(["No internet service"], ["No"], i)

In [95]:
#from pyspark.sql.functions import regexp_replace, col
#replace_cols = [ 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection',
#                'TechSupport','StreamingTV', 'StreamingMovies']
#for i in replace_cols :
#    df = df.withColumn(i, regexp_replace(i, "No internet service", "No"))

In [98]:
from pyspark.ml.feature import RFormula
supervised = RFormula(formula="label ~ . + ItemType:Subjects + ItemType:ItemLocation + ItemType:Author")

In [102]:
#fittedRF = supervised.fit(df)
#preparedDF = fittedRF.transform(df)
#preparedDF.show(1)
df.select("Subjects").show()

+--------------------+
|            Subjects|
+--------------------+
|Confederate State...|
|Popular music Uni...|
|Historical fictio...|
|Erotica, Obscenit...|
|Electronics Dicti...|
|Tannenberg Battle...|
|Slavery United St...|
|Palynology United...|
|Chinese poetry Tr...|
|Money, Foreign ex...|
|Fantasy, Stories ...|
|Circus Poetry, St...|
|Winlock Wash History|
|Water resources d...|
|National Airlines...|
|Knappton Wash His...|
|English literatur...|
|Europe History 17...|
|Lamy John Baptist...|
|Roethke Theodore ...|
+--------------------+
only showing top 20 rows



In [None]:
train, test = preparedDF.randomSplit([0.7, 0.3]) ## preparing dataframe

## Classification

In [79]:
from pyspark.ml.classification import LogisticRegression

In [81]:
train_df, test_df = df.randomSplit([0.7, 0.3])