In [0]:

# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [0]:

# Start Spark session
from pyspark.sql import SparkSession
input_col = 'transport'
spark = SparkSession.builder.appName(input_col).getOrCreate()

In [3]:

# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://disaster-response-final-project.s3.amazonaws.com/pipe_combined_test_training_fixed.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("pipe_combined_test_training_fixed.csv"), sep='|', header=True)

# Show DataFrame
df.show()

+---+--------------------+------+-------+-------+-----+-----------+------------+----------------+-----------------+--------+--------+-----------+-----+----+-------+--------+-----+--------------+--------+-----+---------+----------------------+---------+---------+-----------+-----+---------+-----+-----------+--------------------+---------------+------+-----+----+----------+----+-------------+-------------+
| id|             message| genre|related|request|offer|aid_related|medical_help|medical_products|search_and_rescue|security|military|child_alone|water|food|shelter|clothing|money|missing_people|refugees|death|other_aid|infrastructure_related|transport|buildings|electricity|tools|hospitals|shops|aid_centers|other_infrastructure|weather_related|floods|storm|fire|earthquake|cold|other_weather|direct_report|
+---+--------------------+------+-------+-------+-----+-----------+------------+----------------+-----------------+--------+--------+-----------+-----+----+-------+--------+-----+-----

In [4]:
df.select("shelter").distinct().show()

+-------+
|shelter|
+-------+
|      0|
|      1|
+-------+



In [5]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = df.withColumn('length', length(df['message']))
data_df.show(20, False)

+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-------+-------+-----+-----------+------------+----------------+-----------------+--------+--------+-----------+-----+----+-------+--------+-----+--------------+--------+-----+---------+----------------------+---------+---------+-----------+-----+---------+-----+-----------+--------------------+---------------+------+-----+----+----------+----+-------------+-------------+------+
|id |message                                                                                                                                                                                                              |genre |related|request|offer|aid_related|medical_help|medical_products|search_and_rescue|security|military|child_alone|water|food|shelter|clothing|money|missing_people|refug

In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
# input_col = 'water'
# val indexer = new StringIndexer().setInputCol("label")

one_zero_to_num = StringIndexer(inputCol=input_col,outputCol=input_col+'_label')
one_zero_to_num.setHandleInvalid("skip")

tokenizer = Tokenizer(inputCol="message", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [0]:

# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[one_zero_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [0]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df.select("id", "message", "genre", input_col, "length"))
cleaned = cleaner.transform(data_df.select("id", "message", "genre", input_col, "length"))
#

In [10]:
# Show label and resulting features
cleaned.select(['id', input_col+'_label', 'features']).show(20, False)

+---+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |transport_label|features                                                                                                                                                                                                                                                                                                                                                                                                                                                    

In [0]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

In [12]:
training.show(20, False)

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+---------+------+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
# training, testing = cleaned.randomSplit([0.6, 0.4])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes(labelCol=input_col+"_label")
nb.setLabelCol=input_col+"_label"
predictor = nb.fit(training)

In [14]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(20, False)

+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+---------+------+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [15]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator(labelCol=input_col+"_label")
acc_eval.setLabelCol(input_col+"_label")
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.930656


In [16]:
test_results.select("id","genre",input_col,"prediction").show()


+-----+------+---------+----------+
|   id| genre|transport|prediction|
+-----+------+---------+----------+
|  100|direct|        0|       0.0|
| 1000|direct|        0|       0.0|
|10002|direct|        0|       0.0|
|10005|direct|        0|       0.0|
|10006|direct|        0|       0.0|
|10013|direct|        0|       0.0|
|10017|direct|        0|       0.0|
| 1002|direct|        0|       0.0|
|10022|direct|        0|       0.0|
|10032|direct|        0|       0.0|
|10034|direct|        0|       0.0|
|10042|direct|        0|       0.0|
|10044|direct|        0|       0.0|
|10046|direct|        0|       0.0|
|10055|direct|        0|       0.0|
| 1006|direct|        0|       0.0|
|10060|direct|        0|       0.0|
|10061|direct|        0|       0.0|
|10075|direct|        0|       0.0|
|10076|direct|        0|       0.0|
+-----+------+---------+----------+
only showing top 20 rows



In [17]:
results_df=test_results.select("id",input_col,"prediction")
results_df=results_df.withColumnRenamed("prediction",input_col+"_prediction").orderBy('id', ascending=True)
results_df.show()

+-----+---------+--------------------+
|   id|transport|transport_prediction|
+-----+---------+--------------------+
|  100|        0|                 0.0|
| 1000|        0|                 0.0|
|10002|        0|                 0.0|
|10005|        0|                 0.0|
|10006|        0|                 0.0|
|10013|        0|                 0.0|
|10017|        0|                 0.0|
| 1002|        0|                 0.0|
|10022|        0|                 0.0|
|10032|        0|                 0.0|
|10034|        0|                 0.0|
|10042|        0|                 0.0|
|10044|        0|                 0.0|
|10046|        0|                 0.0|
|10055|        0|                 0.0|
| 1006|        0|                 0.0|
|10060|        0|                 0.0|
|10061|        0|                 0.0|
|10075|        0|                 0.0|
|10076|        0|                 0.0|
+-----+---------+--------------------+
only showing top 20 rows



In [18]:
from pyspark.sql.types import IntegerType
results_df=test_results.select("id",input_col,"prediction")
results_df=results_df.withColumn("id", results_df["id"].cast(IntegerType()))
results_df=results_df.withColumnRenamed("prediction",input_col+"_prediction").orderBy('id', ascending=True)
results_df.show()

+---+---------+--------------------+
| id|transport|transport_prediction|
+---+---------+--------------------+
| 15|        0|                 0.0|
| 16|        0|                 0.0|
| 20|        1|                 0.0|
| 30|        0|                 0.0|
| 31|        0|                 0.0|
| 34|        0|                 0.0|
| 36|        0|                 0.0|
| 39|        1|                 0.0|
| 45|        0|                 0.0|
| 49|        0|                 0.0|
| 50|        0|                 0.0|
| 57|        0|                 0.0|
| 63|        0|                 0.0|
| 66|        0|                 0.0|
| 67|        0|                 0.0|
| 70|        0|                 0.0|
| 74|        0|                 0.0|
| 86|        0|                 0.0|
| 92|        0|                 0.0|
| 94|        0|                 0.0|
+---+---------+--------------------+
only showing top 20 rows



In [21]:
from google.colab import drive
drive.mount('/drive')


Drive already mounted at /drive; to attempt to forcibly remount, call drive.mount("/drive", force_remount=True).


In [0]:
#results_df.write.csv('/drive/My Drive/Colab Notebooks/data_results/'+input_col,'append')
results_df.toPandas().to_csv('/drive/My Drive/Colab Notebooks/data_results/'+input_col+'.csv')

from google.colab import drive
drive.mount('drive')
