In [1]:
from pyspark.sql import SparkSession
from operator import add


# Spark session
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.218:7077") \
        .appName("Scalingtest")\
        .config("spark.executor.cores",2)\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled", True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .config("spark.executor.memory","2g")\
        .getOrCreate()

# Spark context
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/10 11:53:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/10 11:54:01 WARN Utils: Service 'sparkDriver' could not bind on port 9998. Attempting port 9999.
22/03/10 11:54:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/03/10 11:54:03 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10005. Attempting port 10006.
22/03/10 11:54:04 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.


In [2]:
#loop to append file paths for each parent directory
root ='hdfs://192.168.2.200:9000/user/ubuntu/lastfm_train/'
letters = ['A','B','C','E','G','J','K','L','M','N','O','P','Q','R','S','T','U','W','X','Y','Z']
paths=[]
for i in letters:
    paths.append(root+i)
paths[-1]

'hdfs://192.168.2.200:9000/user/ubuntu/lastfm_train/Z'

In [3]:
#starting timing and run all cells below to measure wall time
import time
start = time.time()
i = 0
#read in files from parent directories choose how may by slicing the paths list in the loop
data = spark_session.read.option("recursiveFileLookup", "true").json(paths[i]).cache()

#data = spark_session.read.json('hdfs://192.168.2.200:9000/user/ubuntu/lastfm_train/B/A/A/*.json')

                                                                                

In [4]:
# Flatten the tags column
from pyspark.sql.functions import flatten
dataframe = data.withColumn("New tags", flatten(data.tags))

In [5]:
# Remove the numbers from the tags (the numbers are every other element)
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf, col

removeNumbers = udf(lambda lst: lst[0::2], ArrayType(StringType()))

dataFrame = dataframe.withColumn("Only tags", removeNumbers(col("New tags")))

In [6]:
import re
# Function for determine if a song has been tagged with a tag including "male" or "female" or both
def genderTags(lst):
    genderList = []
    for element in lst:
        if re.search("female|Female", element) and "female" not in genderList:
            genderList.append("female")
            continue
        elif re.search("male|Male", element) and "male" not in genderList:
            genderList.append("male")
            continue
        
    return genderList

# Create udf from the above function
genderTagUDF = udf(genderTags, ArrayType(StringType()))

# Create a new column representing wether a song has been tagged with "female", "male" or both
dataframe = dataFrame.withColumn("Gender tag", genderTagUDF(col("Only tags")))
        

In [7]:
# Function for removing the gender tag from the tags
def removeGenderTags(lst):
    tags = []
    for element in lst:
        if re.search("female|Female", element) or re.search("male|Male", element):
            continue
        else:
            tags.append(element)
        
        
    return tags

# Create udf from the above function
removeGenderTagUDF = udf(removeGenderTags, ArrayType(StringType()))

# Create a new column where the"female" and "male" tags have been removed from the other tags
filteredDF = dataframe.withColumn("Tags", removeGenderTagUDF(col("Only tags")))

In [8]:
# Create new dataframe with only the tags and the gender tag
genderDataFrame = filteredDF.select("Tags", "Gender tag")

In [9]:
# Explode the tags column
from pyspark.sql.functions import explode
genderDataFrame = genderDataFrame.select(genderDataFrame["Gender tag"], explode(genderDataFrame["Tags"]))
genderDataFrame = genderDataFrame.withColumnRenamed("col", "Tag")

In [10]:
# Explode the gender tag column
genderDataFrame = genderDataFrame.select(explode(genderDataFrame["Gender tag"]), genderDataFrame["Tag"])
genderDataFrame = genderDataFrame.withColumnRenamed("col", "Gender")


In [11]:
# Find the most frequent tags for "female"
tagsFemale = genderDataFrame.select("Tag").filter(genderDataFrame["Gender"] == "female").groupBy("Tag").agg({"Tag": "count"})\
        .withColumnRenamed("count(Tag)","Frequency").orderBy("Frequency", ascending=False)

tagsFemale.show()

[Stage 4:>                                                          (0 + 3) / 3]

+-----------------+---------+
|              Tag|Frequency|
+-----------------+---------+
|              pop|      788|
|             rock|      553|
|      alternative|      416|
|             Love|      408|
|        favorites|      385|
|            dance|      382|
|singer-songwriter|      365|
|              00s|      347|
|        beautiful|      327|
|            indie|      320|
|       electronic|      313|
|           Mellow|      299|
|             soul|      279|
|         chillout|      276|
|             sexy|      275|
|            chill|      229|
|              90s|      225|
|             jazz|      219|
|             folk|      218|
|         american|      212|
+-----------------+---------+
only showing top 20 rows



                                                                                

In [12]:
# Find the most frequent tags for "male"
tagsMale = genderDataFrame.select("Tag").filter(genderDataFrame["Gender"] == "male").groupBy("Tag").agg({"Tag": "count"})\
        .withColumnRenamed("count(Tag)","Frequency").orderBy("Frequency", ascending=False)

print(tagsMale.show())
print()
print(f'execution time in minutes: {(time.time()-start)/60}')





+-----------------+---------+
|              Tag|Frequency|
+-----------------+---------+
|             rock|     1263|
|              pop|     1235|
|        favorites|      835|
|      alternative|      818|
|             Love|      774|
|singer-songwriter|      621|
|        beautiful|      607|
|            indie|      594|
|              00s|      580|
|           Mellow|      536|
|         american|      536|
|          Awesome|      480|
|            dance|      468|
| alternative rock|      456|
|           oldies|      439|
|     classic rock|      425|
|            chill|      418|
|         Favorite|      404|
|             soul|      398|
|         chillout|      391|
+-----------------+---------+
only showing top 20 rows

None

execution time in minutes: 7.535582725207011


                                                                                

In [15]:

tagsFemale.toPandas.to_csv(f"/home/ubuntu/Project/Urlich/results/2workers/{letters[i]}_female.csv")
tagsMale.toPandas.to_csv(f"/home/ubuntu/Project/Urlich/results/2workers/{letters[i]}_male.csv")

22/03/10 12:08:31 ERROR FileOutputCommitter: Mkdirs failed to create file:/results/2workers/E.csv/_temporary/0
22/03/10 12:08:47 ERROR TaskSetManager: Task 0 in stage 18.0 failed 4 times; aborting job
22/03/10 12:08:47 ERROR FileFormatWriter: Aborting job fba7e5ee-4d24-449b-b385-a1397e611782.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 4 times, most recent failure: Lost task 0.3 in stage 18.0 (TID 15067) (192.168.2.152 executor 2): java.io.IOException: Mkdirs failed to create file:/results/2workers/E.csv/_temporary/0/_temporary/attempt_202203101208461161139549298688306_0018_m_000000_15067 (exists=false, cwd=file:/home/ubuntu/spark-3.2.1-bin-hadoop3.2/work/app-20220310115403-0080/2)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSyst

Py4JJavaError: An error occurred while calling o223.csv.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:251)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:839)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 4 times, most recent failure: Lost task 0.3 in stage 18.0 (TID 15067) (192.168.2.152 executor 2): java.io.IOException: Mkdirs failed to create file:/results/2workers/E.csv/_temporary/0/_temporary/attempt_202203101208461161139549298688306_0018_m_000000_15067 (exists=false, cwd=file:/home/ubuntu/spark-3.2.1-bin-hadoop3.2/work/app-20220310115403-0080/2)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CsvOutputWriter.scala:38)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:290)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:218)
	... 42 more
Caused by: java.io.IOException: Mkdirs failed to create file:/results/2workers/E.csv/_temporary/0/_temporary/attempt_202203101208461161139549298688306_0018_m_000000_15067 (exists=false, cwd=file:/home/ubuntu/spark-3.2.1-bin-hadoop3.2/work/app-20220310115403-0080/2)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CsvOutputWriter.scala:38)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:290)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [1]:
spark_session.stop()

NameError: name 'spark_session' is not defined