# Iteration 4_INFOSYS 722

In [41]:
# This part starts Data Preparation of Iteration 4
# I will number each relevant cell as answer

In [42]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('iteration4').getOrCreate()
from pyspark.ml.classification import LogisticRegression

In [None]:
# Import dataset 
dataFrame = spark.read.csv('Dataset/data.csv',header=True,inferSchema=True)

In [None]:
# Show the dataset
dataFrame.show()

## 2.3 Data Exporation

In [None]:
# Computer summary of the data
dataFrame.describe().show()

# Number of AggrabatedSexualAssault report
aggSexAssuaultTable = dataFrame[dataFrame.ANZSOCGroup.isin("AggravatedSexualAssault")]
print('\n' + "Total of aggravatedSexualAssault reports:", aggSexAssuaultTable.count())

# Number of Non-AggravatedSexualAssault report
nonAggravatedSexualAssault = dataFrame[dataFrame.ANZSOCGroup.isin("Non-AggravatedSexualAssault")]
print('\n' + "Total of Non-AggravatedSexualAssault reports:", nonAggravatedSexualAssault.count())


## 3 Data  Preparation
### 3.2 To clean the data, issues must be made explicit, then explicitly resolved.

In [None]:
# Show orginal columns before cleaning
print("Total data columns before cleaning:", dataFrame.count())

In [None]:
# Drop null values with their rows
droppedTable = dataFrame.na.drop()

In [None]:
# Show columns after cleaning
print("Total data columns after cleaning:", droppedTable.count())

droppedTable.show()

### 3.3 Data must be appropriately constructed through the creation of new features/variables, and/or data repositories/tables. 

In [None]:
from pyspark.sql.functions import format_number, col

# Sort table by offenceType
groupOffenceType = droppedTable.sort('ANZSOCGroup', ascending = True)

# Sort table by Region 
sortedTable = groupOffenceType.sort('Region',ascending = True)

In [None]:
# Import pandas.
import pandas as pd

# Take the first twenty rows of data, and visualise.
pd.DataFrame(sortedTable.take(20), columns=sortedTable.columns)

In [None]:
# Visualize the data 
sortedTable.groupby('ANZSOCGroup').count().toPandas()

## 7 Data Mining

In [None]:
# Import training data
sortedTable.printSchema()
print(sortedTable.columns)

In [None]:
# Import the relevant packages.
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer)

In [None]:
# Create a string indexer which converts every string into a number
# such as AggravtedSexAssualt = 0 and Non-AggravtedSexAssualt = 1.
# A number will be assigned to every category in the column.
# Set offencetype as label
offenceType_indexer = StringIndexer(inputCol='ANZSOCGroup',outputCol='label')
areaName_indexer = StringIndexer(inputCol='MapDetailName',outputCol='areaIndex')
region_indexer = StringIndexer(inputCol='Region',outputCol='regionIndex')
occurDay_indexer = StringIndexer(inputCol='OccurrenceDayOfWeek',outputCol='occurDayIndex')
weapon_indexer = StringIndexer(inputCol='Weapon',outputCol='weaponIndex')
yearMonth_indexer = StringIndexer(inputCol='YearMonth',outputCol='yearMonthIndex')


# Conver the various outputs into a single vector.
# Multiple columns are collapsed into one. 
areaName_encoder = OneHotEncoder(inputCol='areaIndex',outputCol='areaVec')
region_encoder = OneHotEncoder(inputCol='regionIndex',outputCol='regionVec')
occurDay_encoder = OneHotEncoder(inputCol='occurDayIndex',outputCol='occurDayVec')
weapon_encoder = OneHotEncoder(inputCol='weaponIndex',outputCol='weaponVec')
yearMonth_encoder = OneHotEncoder(inputCol='yearMonthIndex',outputCol='yearMonthVec')


# Use vector assembler to turn all of these columns into one column (named features).
assembler = VectorAssembler(inputCols=['areaVec','regionVec','occurDayVec',
                                       'weaponVec','yearMonthVec','NumberofVictimisations', 
                                       'NumberofRecords'], outputCol="features")

In [None]:
from pyspark.ml import Pipeline

# Go through each columns to pipeline
pipeline = Pipeline(stages=[areaName_indexer, region_indexer, occurDay_indexer, weapon_indexer, yearMonth_indexer, 
                            areaName_encoder, region_encoder, occurDay_encoder, weapon_encoder, yearMonth_encoder,
                            assembler])

# Apply it to the data.
pipeline_model = pipeline.fit(sortedTable)

# Incorporate results into a new DataFrame.
pipe_df = pipeline_model.transform(sortedTable)

# Remove all variables other than features and label. 
pipe_df = pipe_df.select('label', 'features')