In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, sum

In [None]:
# Spark session & context
spark = SparkSession.builder.master('local[16]').getOrCreate()
sc = spark.sparkContext

# # Sum of the first 100 whole numbers
# rdd = sc.parallelize(range(100000 + 1))
# rdd.sum()

In [None]:
# Read in the gun.csv file
df = spark.read.csv('gun_data.csv', header=True, inferSchema=True)

In [None]:
print(df.head())

In [None]:
# Print the columns
df.columns

In [None]:
# remove any na values
df = df.na.drop()

In [None]:
# Total rows of data
df.count()

In [None]:
# transform n_killed into integer
from pyspark.sql.functions import col, sum
df = df.withColumn('n_killed', col('n_killed').cast('integer'))

In [None]:
# sum of n_killed
df.groupBy().sum('n_killed').show()

In [None]:
# Sort by number of times a state appears
df.groupBy('state').count().sort('count', ascending=False).show()

In [None]:
# Sort by state and n_killed
df.groupBy('state').sum('n_killed').sort('sum(n_killed)', ascending=False).show()

In [None]:
# make a new column for the year, extract from the date column
from pyspark.sql.functions import year
df = df.withColumn('year', year('date'))

In [None]:
# sort by year frequency
df.groupBy('year').count().sort('count', ascending=False).show()

In [None]:
# sort by year and n_killed
df.groupBy('year').sum('n_killed').sort('sum(n_killed)', ascending=False).show()

In [None]:
# print df schema
print(df.printSchema())

In [None]:
# Run k means clustering on longitude and latitude
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import ClusteringEvaluator

In [None]:
# show a row of longitude
df.select('longitude').show()

In [None]:
# Transform longitude and latitude into a float
df = df.withColumn('longitude', col('longitude').cast('float'))

In [None]:
# Transform latitude into a float
df = df.withColumn('latitude', col('latitude').cast('float'))

In [None]:
# Vector assembler
assembler = VectorAssembler(inputCols=['longitude', 'latitude'], outputCol='features')

In [None]:
# Transform the data
df_kmeans = assembler.transform(df)

# Create the kmeans model
kmeans = KMeans().setK(2).setSeed(1)

In [None]:
# Drop any rows with missing data
df_kmeans = df_kmeans.na.drop()

# Drop any 0 values
df_kmeans = df_kmeans.filter((df_kmeans.longitude != 0) & (df_kmeans.latitude != 0))

In [None]:
# Fit the model
model = kmeans.fit(df_kmeans.sample(False, 0.1, seed=42))

In [None]:
# Make predictions
predictions = model.transform(df_kmeans)


In [None]:
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

# Show the silhouette score
silhouette = evaluator.evaluate(predictions)

In [None]:
# Print the silhouette score
print("Silhouette with squared euclidean distance = " + str(silhouette))

In [None]:
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

In [None]:
# Show the predictions
predictions.show()