In [3]:
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql.functions import col , avg , count, stddev_pop, length, replace, size , split , regexp_replace
import os

In [10]:
# Set Google Cloud credentials
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r"C:\Users\hi\Downloads\vijay-410011-a543addd551c.json"

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Configure Hadoop settings for GCS
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Read CSV file from GCS
df = spark.read.option("inferSchema", "true").option("header", "true").csv("gs://spark_data1/amazon_reviews.csv")

df.show()
df.cache()  

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      asin|               title|              imgUrl|          productURL|               stars|             reviews|               price|           listPrice|        categoryName|        isBestSeller|   boughtInLastMonth|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B0C154VR2L|"SAMSUNG Smart TV...| Samsung Gaming Hub"|https://m.media-a...|https://www.amazo...|                 4.6|                 269|             2279.05|                 0.0|TV, Áudio e Cinem...|               False|
|B0C85J5HYD|Mini Projetor Por...|https://m.media-a...|https://www.amazo...|                 4.5|            

asin,title,imgUrl,productURL,stars,reviews,price,listPrice,categoryName,isBestSeller,boughtInLastMonth
B0C154VR2L,"""SAMSUNG Smart TV...","Samsung Gaming Hub""",https://m.media-a...,https://www.amazo...,4.6,269,2279.05,0.0,"TV, Áudio e Cinem...",False
B0C85J5HYD,Mini Projetor Por...,https://m.media-a...,https://www.amazo...,4.5,816,409.99,0.0,TV Áudio e Cinema...,True,5000
B0C1538ZJ4,"""Samsung Smart TV...","Samsung Gaming Hub""",https://m.media-a...,https://www.amazo...,4.7,638,1969.0,2499.0,"TV, Áudio e Cinem...",False
B0B4V379PC,Samsung 60BU8000 ...,https://m.media-a...,https://www.amazo...,4.6,675,3099.0,0.0,"TV, Áudio e Cinem...",False,0
B0899BRB4B,Smart TV LED 32''...,https://m.media-a...,https://www.amazo...,4.7,3945,1079.9,1369.0,"TV, Áudio e Cinem...",True,0
B0BTF68G5R,Samsung Q90T - Sm...,4K,UHD,120Hz,Alexa built in,Processador com IA,"Tela sem limites""",https://m.media-a...,https://www.amazo...,4.6
B0C6NL4QK3,"""Smart TV 43"""" 4K...",https://m.media-a...,https://www.amazo...,4.5,111,1899.0,2849.0,"TV, Áudio e Cinem...",False,0
B08B14TSHS,"""Samsung UN32T430...",Wifi,HDMI,"USB""",https://m.media-a...,https://www.amazo...,4.7,2480,1071.0,1369.0
B0899GJHWR,"""Smart TV LED 43""...",2 HDMI,1 USB,Wi-Fi,HDR,Sistema Operacio...,https://m.media-a...,https://www.amazo...,4.7,517
B09KNTBKMP,Roku Express - St...,https://m.media-a...,https://www.amazo...,4.7,8391,212.0,222.9,"TV, Áudio e Cinem...",False,0


In [5]:
avg_column = df.select(avg("reviews")).show()


+------------------+
|      avg(reviews)|
+------------------+
|219.84163272298773|
+------------------+



In [6]:
std_column = df.select(stddev_pop("price")).show()


+-----------------+
|stddev_pop(price)|
+-----------------+
|833.4749624361606|
+-----------------+



In [7]:
Frequent_values = df.groupBy("categoryName").count().orderBy("count", ascending = False).limit(5).show()


+--------------------+-----+
|        categoryName|count|
+--------------------+-----+
|               Bebês|32007|
|         Eletrônicos|31034|
|            Mochilas|19051|
|Produtos de Cuida...|19010|
|Instrumentos para...|19004|
+--------------------+-----+



In [8]:
df = df.withColumnRenamed("price", "product_price").show()


+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      asin|               title|              imgUrl|          productURL|               stars|             reviews|       product_price|           listPrice|        categoryName|        isBestSeller|   boughtInLastMonth|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B0C154VR2L|"SAMSUNG Smart TV...| Samsung Gaming Hub"|https://m.media-a...|https://www.amazo...|                 4.6|                 269|             2279.05|                 0.0|TV, Áudio e Cinem...|               False|
|B0C85J5HYD|Mini Projetor Por...|https://m.media-a...|https://www.amazo...|                 4.5|            

In [11]:
df_summary = df.summary("min", "25%", "75%", "mean").show()


+-------+-------------------+--------------------+------------------+------------------+-----------------+------------------+--------------------+-----------------+------------------+-----------------+------------------+
|summary|               asin|               title|            imgUrl|        productURL|            stars|           reviews|               price|        listPrice|      categoryName|     isBestSeller| boughtInLastMonth|
+-------+-------------------+--------------------+------------------+------------------+-----------------+------------------+--------------------+-----------------+------------------+-----------------+------------------+
|    min|         0000003956| Blocos de Montar...|            Micron|            Micron|                 |           & Games| (PS5 NOT Included)"|                0|             1 USB| 127V (NACIONAL)"|           16G RAM|
|    25%|      6.555510463E9|                23.0|               1.0|              76.0|              0.0|          

In [12]:
word_count = df.withColumn("word_count",(length(col("categoryName")) - length(regexp_replace(col("categoryName"), " ", ""))+1))

word_count.select("asin","categoryName", "word_count").show()


+----------+--------------------+----------+
|      asin|        categoryName|word_count|
+----------+--------------------+----------+
|B0C154VR2L|                 0.0|         1|
|B0C85J5HYD|TV Áudio e Cinema...|         6|
|B0C1538ZJ4|              2499.0|         1|
|B0B4V379PC|TV, Áudio e Cinem...|         6|
|B0899BRB4B|TV, Áudio e Cinem...|         6|
|B0BTF68G5R|https://m.media-a...|         1|
|B0C6NL4QK3|TV, Áudio e Cinem...|         6|
|B08B14TSHS|                2480|         1|
|B0899GJHWR|https://www.amazo...|         1|
|B09KNTBKMP|TV, Áudio e Cinem...|         6|
|B0CC5RRGZ9|TV, Áudio e Cinem...|         6|
|B088C4QZV2|TV, Áudio e Cinem...|         6|
|B0B5HJYL3B|TV, Áudio e Cinem...|         6|
|B0C153ML9K|             2861.24|         1|
|B0C7W863T2|TV, Áudio e Cinem...|         6|
|B0C4ZN3PPS|TV, Áudio e Cinem...|         6|
|B0C15423S3|              2399.0|         1|
|B0C14ZM9MQ|              3496.0|         1|
|B0BW19LGZR|TV, Áudio e Cinem...|         6|
|B0C6NJLG7

In [15]:
df1 = spark.createDataFrame([ ("A", 1, None), ("B", None, "123" ), ("B", 3, "456"), ("D", None, None), ], ["Name", "Value", "id"])
df1.show() 

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|NULL|
|   B| NULL| 123|
|   B|    3| 456|
|   D| NULL|NULL|
+----+-----+----+



In [16]:
drop_df = df1.na.drop(subset=["id" , "value"]).show()

+----+-----+---+
|Name|Value| id|
+----+-----+---+
|   B|    3|456|
+----+-----+---+



+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      asin|               title|              imgUrl|          productURL|               stars|             reviews|               price|           listPrice|        categoryName|        isBestSeller|   boughtInLastMonth|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B0C154VR2L|"SAMSUNG Smart TV...| Samsung Gaming Hub"|https://m.media-a...|https://www.amazo...|                 4.6|                 269|             2279.05|                 0.0|TV, Áudio e Cinem...|               False|
|B0C85J5HYD|Mini Projetor Por...|https://m.media-a...|https://www.amazo...|                 4.5|            

In [2]:
from pyspark.sql import SparkSession
import os
# Set up Spark session
spark = SparkSession.builder.getOrCreate()
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r"C:\Users\hi\Downloads\vijay-410011-a98c6db25f77.json"

# Read data from GCS and create DataFrame
df = spark.read.option("inferSchema", "true").option("header", "true").csv("gs://spark_data1/amazon_reviews.csv")

# Show the DataFrame schema
df.printSchema()

# Show the first few rows of the DataFrame
df.show()

In [21]:
from pyspark.sql import SparkSession
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r"C:\Users\hi\Downloads\vijay-410011-a543addd551c.json"
spark = SparkSession.builder.appName('GCSFilesRead').getOrCreate()

spark.conf.set("google.cloud.auth.service.account.enable", "true")
spark.conf.set("google.cloud.auth.service.account.json.keyfile", r"C:\Users\hi\Downloads\vijay-410011-a543addd551c.json")
spark.conf.set("fs.gs.auth.type", "SERVICE_ACCOUNT_JSON_KEYFILE")

bucket_name="gs://spark_data1"
path="gs://spark_data1/amazon_reviews.csv"

df  = spark.read.option("header" , "true").csv(path)

Py4JJavaError: An error occurred while calling o157.csv.
: java.lang.IllegalStateException: No valid credential configuration discovered.
	at com.google.cloud.hadoop.util.CredentialConfiguration.getCredential(CredentialConfiguration.java:160)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getCredential(GoogleHadoopFileSystemBase.java:1613)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.createGcsFs(GoogleHadoopFileSystemBase.java:1699)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1658)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:683)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:646)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:724)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:722)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:551)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:404)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:538)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)
