In [None]:
# test spark to see if it is running correctly
import findspark
findspark.init('spark')
import pyspark
import random
sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000
def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()

In [1]:
# import basic libraries
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

In [2]:
# initiate SparkContext -- step 1
# The first step in using Spark is connecting to a cluster.
# The master is connected to the rest of the computers in the cluster, which are called worker. 
# The master sends the workers data and calculations to run, and they send their results back to the master.
import findspark
findspark.init('spark')
import pyspark
import random
sc = pyspark.SparkContext(appName="player_data")

In [3]:
# verify sparkcontext and version
print(sc)
print(sc.version)

<SparkContext master=local[*] appName=player_data>
2.4.3


In [4]:
# Spark's core data structure is the Resilient Distributed Dataset (RDD). 
# This is a low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster. 
# However, RDDs are hard to work with directly, so you'll be using the Spark DataFrame abstraction built on top of RDDs.
# To start working with Spark DataFrames, you first have to create a SparkSession object from your SparkContext. 
# You can think of the SparkContext as your connection to the cluster and the SparkSession as your interface with that connection.

from pyspark.sql import SparkSession
my_spark = SparkSession.builder.getOrCreate()
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x7fd46bb94240>


In [5]:
import gcsfs
fs = gcsfs.GCSFileSystem(project='data-flow-test-248722')
fs.ls('py_spark_ds')
with fs.open('py_spark_ds/player_info.csv') as f:
    pd_temp = pd.read_csv(f)

In [6]:
s_df = my_spark.createDataFrame(pd_temp)

In [7]:
type(s_df)

pyspark.sql.dataframe.DataFrame

In [8]:
for col in s_df.columns:
  s_df = s_df.withColumnRenamed(col,col.replace(".", "_"))

In [9]:
for col in s_df.columns:
  s_df = s_df.withColumnRenamed(col,col.replace(" ", "_"))

In [10]:
for col in s_df.columns:
  s_df = s_df.withColumnRenamed(col,col.replace("-", "_"))

In [11]:
s_df.createOrReplaceTempView("players")

In [12]:
print(my_spark.catalog.listTables())

[Table(name='players', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [13]:
# Don't change this query
query = "FROM players SELECT * LIMIT 2"

In [14]:
# run query
players2 = my_spark.sql(query)
# Show the results
players2.show()

+----------+---------------+------------+----------+------+------------+---+--------------+-----------------+----------------------+----------+---------+---------+---------+---------+------------+-----------------+------------------+-------------+------------------+-------------------+-----------+--------------------+----------+---------+------------------+---------------+------------+--------------+------------------+-----------------+---------------+------------+--------------+-----------------+----------------+---------+------------+----------+---------+------+-----------+---------------+------+---------------+------------------+----------------+------------------+-----------------+-----------------+--------------------+------------------+--------------------+-------------------+----------------+-------------------+-----------------+-------------------+------------------+-------------+----------------+--------------+----------------+---------------+---------------+------------------

In [20]:
# Don't change this query
query = "SELECT name,SUM(deaths) FROM players GROUP BY name LIMIT 5"

# Run the query
player_counts = my_spark.sql(query)

# Convert the results to a pandas DataFrame
death_counts = player_counts.toPandas()

# Print the head of pd_counts
death_counts.head()

Unnamed: 0,name,sum(deaths)
0,SK1RON1N,120548
1,lmmune Crown,126072
2,iUncleBadTouchx,72584
3,TastefulClover8,177857
4,XENOCIDER XIII,198874


In [21]:
col_names = s_df.columns

In [22]:
col_names = pd.DataFrame(col_names, columns=['title'])

In [23]:
col_names[250:300]

Unnamed: 0,title
250,SIDEARM_stat_shots
251,SIDEARM_stat_time
252,Infantry_Fighting_Vehicle_extra_kpm
253,Infantry_Fighting_Vehicle_extra_spm
254,Infantry_Fighting_Vehicle_stat_destroys
255,Infantry_Fighting_Vehicle_stat_kills
256,Infantry_Fighting_Vehicle_stat_time
257,Anti_Air_extra_kpm
258,Anti_Air_extra_spm
259,Anti_Air_stat_destroys


In [24]:
col_names = col_names[1:]
col_names.tail()

Unnamed: 0,title
390,AA_LAV_AD_extra_spm
391,AA_LAV_AD_stat_destroys
392,AA_LAV_AD_stat_kills
393,AA_LAV_AD_stat_score
394,AA_LAV_AD_stat_time


In [25]:
from pyspark.sql.functions import col, countDistinct
newDF = pd.DataFrame()

for index, row in col_names.iterrows():
    query = "SELECT COUNT (DISTINCT "+ str(row[0]) + ") as count FROM players "
    tmp1 = my_spark.sql(query)
    tmp2 = tmp1.toPandas()
    tmp2['index'] = int(index)
    newDF = newDF.append(tmp2)

newDF = newDF.set_index('index').join(df_col)
newDF = newDF[['title','count']]

Py4JJavaError: An error occurred while calling o2695.collectToPython.
: java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
	at java.lang.StringBuilder.append(StringBuilder.java:136)
	at scala.StringContext.standardInterpolator(StringContext.scala:126)
	at scala.StringContext.s(StringContext.scala:95)
	at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:200)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:75)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3254)
	at sun.reflect.GeneratedMethodAccessor68.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
newDF

In [None]:
sc.stop()