In [1]:
# Import findspark and initialize. 
import findspark
findspark.init()

ModuleNotFoundError: No module named 'findspark'

In [2]:
# Import packages
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType, DateType,IntegerType

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/2/better_netflix_titles.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("better_netflix_titles.csv"), header=True, inferSchema=True)

# Show DataFrame
df.show()

+---+-------+-------+------+--------------------+----------+------------+------+---------+
| id|show_id|   type| title|             country|date_added|release_year|rating| duration|
+---+-------+-------+------+--------------------+----------+------------+------+---------+
|  0|     s1|TV Show|    3%|              Brazil| 14-Aug-20|        2020| TV-MA|4 Seasons|
|  1|     s2|  Movie|  7:19|              Mexico| 23-Dec-16|        2016| TV-MA|   93 min|
|  2|     s3|  Movie| 23:59|           Singapore| 20-Dec-18|        2011|     R|   78 min|
|  3|     s4|  Movie|     9|       United States| 16-Nov-17|        2009| PG-13|   80 min|
|  4|     s5|  Movie|    21|       United States|  1-Jan-20|        2008| PG-13|  123 min|
|  5|     s6|TV Show|    46|              Turkey|  1-Jul-17|        2016| TV-MA| 1 Season|
|  6|     s7|  Movie|   122|               Egypt|  1-Jun-20|        2019| TV-MA|   95 min|
|  7|     s8|  Movie|   187|       United States|  1-Nov-19|        1997|     R|  119 min|

In [4]:
# Create our temporary view
df.createOrReplaceTempView('movies')

In [5]:
# We can perform most any SQL action at this point
# here we are converting the date to a more workable date object
#NOTE: since we are not assigning this to a dataframe the change is not saved.
spark.sql("""SELECT show_id, 
   type, 
   title, 
   country, 
   TO_DATE(date_added, 'MMMM d, yyyy') 
   AS date_added, 
   release_year, 
   rating, 
   duration 
   FROM movies 
   WHERE date_added IS NOT null AND type='Movie'""").show(10)

+-------+-----+-----+-------------+----------+------------+------+--------+
|show_id| type|title|      country|date_added|release_year|rating|duration|
+-------+-----+-----+-------------+----------+------------+------+--------+
|     s2|Movie| 7:19|       Mexico|      null|        2016| TV-MA|  93 min|
|     s3|Movie|23:59|    Singapore|      null|        2011|     R|  78 min|
|     s4|Movie|    9|United States|      null|        2009| PG-13|  80 min|
|     s5|Movie|   21|United States|      null|        2008| PG-13| 123 min|
|     s7|Movie|  122|        Egypt|      null|        2019| TV-MA|  95 min|
|     s8|Movie|  187|United States|      null|        1997|     R| 119 min|
|     s9|Movie|  706|        India|      null|        2019| TV-14| 118 min|
|    s10|Movie| 1920|        India|      null|        2008| TV-MA| 143 min|
|    s11|Movie| 1922|United States|      null|        2017| TV-MA| 103 min|
|    s14|Movie|2,215|     Thailand|      null|        2018| TV-MA|  89 min|
+-------+---

In [6]:
# All of the SQL you learned in Unit 6 is available to you in Spark SQL
# Here we are listing out the counts by rating
# NOTE: it is almost NEVER a good idea to "order by" when using Spark with large datasets (more on this in 8.2)
spark.sql("""
  SELECT
    rating,
    count(*) AS number_of_ratings
  FROM movies
  GROUP BY rating
  ORDER BY 2 DESC
  """).show()

+--------+-----------------+
|  rating|number_of_ratings|
+--------+-----------------+
|   TV-MA|             2863|
|   TV-14|             1931|
|   TV-PG|              805|
|       R|              665|
|   PG-13|              386|
|    TV-Y|              280|
|   TV-Y7|              271|
|      PG|              247|
|    TV-G|              194|
|      NR|               84|
|       G|               39|
|    null|                9|
|TV-Y7-FV|                6|
|      UR|                5|
|   NC-17|                3|
+--------+-----------------+



In [7]:
# Let's output a file with just listing for children
# first we will use our spark sql to write to a dataframe

out_df= spark.sql("""
  SELECT 
  title,
  rating,
  date_added,
  duration
  FROM Movies
  WHERE rating IN ('G','PG', 'PG-13')""")

# Make sure we got what we wanted
out_df.show()

+--------------------+------+----------+--------+
|               title|rating|date_added|duration|
+--------------------+------+----------+--------+
|                   9| PG-13| 16-Nov-17|  80 min|
|                  21| PG-13|  1-Jan-20| 123 min|
|            Æon Flux| PG-13|  1-Feb-18|  93 min|
|         10,000 B.C.| PG-13|  1-Jun-19| 109 min|
|           16 Blocks| PG-13|  1-Nov-19| 102 min|
|            17 Again| PG-13|  1-Jan-21| 102 min|
|20 Feet From Stardom| PG-13| 22-Sep-18|  91 min|
|             28 Days| PG-13| 30-Sep-20| 104 min|
|      3 Days to Kill| PG-13|  1-Dec-20| 117 min|
|       3 Generations| PG-13| 28-Aug-17|  92 min|
|            3 Idiots| PG-13|  1-Aug-19| 164 min|
|        5 Flights Up| PG-13| 17-Mar-19|  92 min|
|      50 First Dates| PG-13|  1-Dec-20|  99 min|
|        A 2nd Chance|    PG|  1-Jul-17|  95 min|
|     A Boy Called Po|    PG| 15-Jan-18|  94 min|
|    A Bridge Too Far|    PG|  1-Jul-20| 176 min|
|A California Chri...| PG-13| 14-Dec-20| 107 min|


In [8]:
#  As Spark stores the data in partitions, it will also write data in partitions.
#  These partitions will always be stored in a folder with the same name as the file, and that folder may often contain many subfolders or files.
#  Within the partition folder, there will be a file or files that starts with `part-`, these are CSV files. 
# However, they are often not optimal for friendly reading, but can be downloaded to your computer.

out_df.write.csv('movies_out_spark.csv')

Py4JJavaError: An error occurred while calling o35.csv.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:288)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:851)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 7) (TheBigLaptop.lan executor driver): java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\katel\22-Big-Data\2\Activities\07-Ins_SparkSQL\Solved\movies_out_spark.csv\_temporary\0\_temporary\attempt_202305222050507538817350095491311_0008_m_000000_7\part-00000-51436520-e2b2-4b9f-92d5-2ec8750e34f4-c000.csv
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CsvOutputWriter.scala:38)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:84)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:327)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$22(FileFormatWriter.scala:266)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1623)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:255)
	... 40 more
Caused by: java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\katel\22-Big-Data\2\Activities\07-Ins_SparkSQL\Solved\movies_out_spark.csv\_temporary\0\_temporary\attempt_202305222050507538817350095491311_0008_m_000000_7\part-00000-51436520-e2b2-4b9f-92d5-2ec8750e34f4-c000.csv
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CsvOutputWriter.scala:38)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:84)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:327)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$22(FileFormatWriter.scala:266)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more


In [9]:
# The easiest work around of the part file output is to take the data to Pandas and write out a CSV.
# This forces the data to the master node and is not recommended unless you have filtered and/or aggregated your data to a reasonable size.

out_df.toPandas().to_csv('movies_out_pandas.csv')