# To Script

In [1]:
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="credentials/credentials-gs.json"

# https://kashif-sohail.medium.com/read-files-from-google-cloud-storage-bucket-using-local-pyspark-and-jupyter-notebooks-f8bd43f4b42e

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, ArrayType, LongType
from pyspark.sql import DataFrameWriter

from google.cloud import storage

spark = SparkSession \
  .builder \
  .appName('spark-ETL-Tweets') \
  .getOrCreate()


spark._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
project = spark._jsc.hadoopConfiguration().get('fs.gs.project.id')
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

BUCKET = 'cloud-computing-2122-bjr'
BUCKET_LINK = 'gs://cloud-computing-2122-bjr'

22/04/10 11:54:49 WARN Utils: Your hostname, OutOne resolves to a loopback address: 127.0.1.1; using 192.168.1.230 instead (on interface wlp8s0)
22/04/10 11:54:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/10 11:54:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
sc = SparkContext.getOrCreate()
print(spark.sparkContext._jsc.sc().listJars())

Vector()


In [3]:
csv_file = 'banana'
READFILE_SCHEMA = StructType([StructField('csvfile', StringType(), False)])

In [4]:
df = spark.createDataFrame([[csv_file]], READFILE_SCHEMA)

In [5]:
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-------+
|csvfile|
+-------+
| banana|
+-------+



                                                                                

## 0: Check which CSV's havent been read

https://cloud.google.com/storage/docs/downloading-objects#storage-download-object-portion-python

Need: 
* To Have such .txt in bucket
* Pull .txt
* Check against list of csv's in bucket

In [3]:
# Read txt file of already processed CSV's
def read_txt_blob(bucket_name, destination_file_name):
    # https://stackoverflow.com/questions/48279061/gcs-read-a-text-file-from-google-cloud-storage-directly-into-python
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.get_blob(destination_file_name)
    read_file = blob.download_as_text(encoding="utf-8")
    # Log instead of print
    print(
        "Loaded from bucket {} to local file {}.".format(
            bucket_name, destination_file_name
        )
    )
    return read_file.split('\n')

FILES_ALREADY_READ = read_txt_blob(BUCKET, 'control/read_files.txt')

Loaded from bucket cloud-computing-2122-bjr to local file control/read_files.txt.


In [4]:
FILES_ALREADY_READ

['220312.csv']

In [5]:
# List all available data files
def list_blobs_with_prefix(bucket_name, prefix):
    # https://cloud.google.com/storage/docs/listing-objects#storage-list-objects-python
    storage_client = storage.Client()

    # Note: Client.list_blobs requires at least package version 1.17.0.
    blobs = storage_client.list_blobs(bucket_name, prefix=prefix)
    list_of_csvs = []
    """
    print("Blobs:")
    """
    for blob in blobs:
        list_of_csvs.append(blob.name.split("/")[-1])
        # print(blob.name)
    return list_of_csvs

DATA_IN_BUCKET = list_blobs_with_prefix(BUCKET, 'data/')

In [6]:
FILES_TO_PROCESS = [f'data/{FILE}' for FILE in DATA_IN_BUCKET if FILE not in FILES_ALREADY_READ]

In [7]:
FILES_TO_PROCESS

['data/220312-subset.csv']

## 1: Read CSV into Spark

In [8]:
## print(f"{BUCKET_LINK}/{csv_file}")

### 1.1: Define Schema of Global Data

https://sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield/   
https://spark.apache.org/docs/latest/sql-ref-datatypes.html

In [8]:
GLOBAL_SCHEMA = StructType([ \
    StructField("RowID",IntegerType(),False), \
    StructField("userId",LongType(),False), \
    StructField("username",StringType(),False), \
    StructField("acctdesc",StringType(),True), \
    StructField("location", StringType(), True), \
    StructField("n_following", IntegerType(), True), \
    StructField("n_followers", IntegerType(), True), \
    StructField("n_totaltweets", IntegerType(), True), \
    StructField("usercreatedts", TimestampType(), False), \
    StructField("tweetId", LongType(), False), \
    StructField("tweetcreatedts", TimestampType(), False), \
    StructField("retweetcount", IntegerType(), False), \
    StructField("text", StringType(), True), \
    StructField("hashtags", StringType(), False), \
    StructField("language", StringType(), True), \
    StructField("coordinates", StringType(), True), \
    StructField("favorite_count", IntegerType(), True), \
    StructField("extractedts", TimestampType(), False) \
  ])

# StructField("hashtags", StructType([StructField('text', StringType(), False), \
# StructField('indices', ArrayType(IntegerType()), False)]),False), \

### 1.2 Define Schema of Users DB

Has to be in accordance with BigQuery Table

In [37]:
USERS_SCHEMA = StructType([ \
    StructField("userId",LongType(),False), \
    StructField("usercreatedts",TimestampType(),False), \
    StructField("username",StringType(),False), \
    StructField("acctdesc",StringType(),True), \
    StructField("n_following", IntegerType(), True), \
    StructField("n_followers", IntegerType(), True), \
    StructField("n_totaltweets", IntegerType(), True), \
  ])

### 1.2 Define Schema of Tweets DB

Has to be in accordance with BigQuery Table

In [None]:
TWEETS_SCHEMA = StructType([ \
    StructField("tweetId",LongType(),False), \
    StructField("tweetcreatedts",TimestampType(),False), \
    StructField("userId",LongType(),False), \
    StructField("location",StringType(),True), \
    StructField("language", StringType(), True), \
    StructField("retweetcount", IntegerType(), True), \
    StructField("favorite_count", IntegerType(), True), \
    StructField("tweet_text", StringType(), True), \
    StructField("hashtags", StringType(), False), \
    StructField("coordinates", StringType(), True), \
    StructField("extractedts", TimestampType(), False) \
  ])

### Connect to Users DB

In [25]:
USERS_TABLE_BIGQUERY = "cadeira-nuvem-2122:bq_cloud_2122.db_users"
dfUsers = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .load()

## We only want the userId to compare
dfUsers = dfUsers.select("userId")

dfUsers.write.mode("append").saveAsTable("ALL_USERS")

Py4JJavaError: An error occurred while calling o115.load.
: java.lang.ClassNotFoundException: 
Failed to find data source: bigquery. Please find packages at
http://spark.apache.org/third-party-projects.html
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:443)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: bigquery.DefaultSource
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:656)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:656)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:656)
	... 15 more


### 1.3 Define Schema of Tweets DB

Has to be in accordance with BigQuery Table

### Iterate over CSV's

* Control flow (Pull user data from BigQuery)
* Update / Add Users
* Add Tweets

### Mock User data

In [31]:
import datetime;
spark.sql('drop table if exists USER_DATA_TEST')
columns = ['userId','usercreatedts', 'username', 'acctdesc', 'n_following', 'n_followers', 'n_totaltweets']
data = [(1068621782, datetime.datetime.now(), 'BklynContractor', 'Nada', 1, 1, 2)]
rdd = spark.sparkContext.parallelize(data)

spark.createDataFrame(rdd, schema = USERS_SCHEMA).toDF(*columns).select('userId').write.mode("append").saveAsTable("USER_DATA_TEST")
spark.sql('SELECT * FROM USER_DATA_TEST')

userId
1068621782


### Iteration

In [32]:
# https://stackoverflow.com/questions/51751852/dataproc-reading-from-google-cloud-storage
# https://stackoverflow.com/questions/61197811/can-i-read-csv-files-from-google-storage-using-spark-in-more-than-one-executor
for csv_file in FILES_TO_PROCESS:
    ## DOES NOT WORK
    ##sc = SparkContext.getOrCreate()
    ##rdd_csv = sc.wholeTextFiles(f"{BUCKET_LINK}/{csv_file}")
    ##rdd_csv.collect()
    ## WORKS - BUT DOES IT WORK FOR MULTIPLE FILES READ?
    csv_file = (spark
                .read
                .format("csv")
                .option("wholeFile", True)
                .option("multiline",True)
                .option("header", True)
                .option("inferSchema", "true")
                .option("dateFormat", "yyyy-MM-dd")
                .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
                .schema(GLOBAL_SCHEMA)
                .load("input/220312-subset.csv"))
    
    csv_file.createOrReplaceTempView("DATA_FILE")
    
    
    #### START: USER DATA ####
    ## Duplicated Users Data has the information for update. When it's implemented, should be the table giving the data to update users
    spark.sql("""SELECT A.* FROM DATA_FILE A INNER JOIN USER_DATA_TEST B ON A.userId == B.UserId""").write.mode("overwrite").saveAsTable("DUPLICATED_USERS")
    
    ### NEW_USER_DATA is the table that's thrown into bigquery
    spark.sql("""SELECT DISTINCT
                    A.userId, 
                    A.usercreatedts, 
                    A.username, 
                    A.acctdesc, 
                    A.n_following, 
                    A.n_followers, 
                    A.n_totaltweets 
                FROM DATA_FILE A INNER JOIN DUPLICATED_USERS B ON A.userId <> B.UserId""").createOrReplaceTempView("NEW_USER_DATA")
    
    ## This is the control table in the script, that if several CSV files are loaded, the chances of an user being loaded twice are nullified.
    spark.sql("""INSERT INTO TABLE USER_DATA_TEST SELECT userId FROM NEW_USER_DATA""")
    
    spark.sql('SELECT * FROM NEW_USER_DATA').write.mode('append') \
      .format('bigquery') \
      .option('table', USERS_TABLE_BIGQUERY)
    
    ### TODO ###
    #UPDATE USER DATA THAT'S IN DUPLICATES
    # CHANGE USER_DATA_TEST TO ALL_USERS
    #### END: USER DATA ####
    
    #### START: Tweets Data ####
    spark.sql("""SELECT DISTINCT
                A.tweetId, 
                A.tweetcreatedts, 
                A.userId, 
                A.location, 
                A.language, 
                A.retweetcount, 
                A.favouritecount,
                A.tweettext,
                A.hashtags,
                A.coordinates,
                A.extractedts
                FROM DATA_FILE""").write.mode('append') \
                                  .format('bigquery') \
                                  .option('table', USERS_TABLE_BIGQUERY)
    #### END: USER DATA ####

22/04/08 22:31:13 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: userid, username, acctdesc, following, followers, totaltweets, usercreatedts
 Schema: userId, username, acctdesc, n_following, n_followers, n_totaltweets, usercreatedts
Expected: n_following but found: following
CSV file: file:///home/portugapt/Documents/Cloud%202122/dev/ETL/input/220312-subset.csv


In [25]:
spark.sql("""INSERT INTO TABLE USER_DATA_TEST SELECT userId FROM NEW_USER_DATA""")

22/04/08 22:30:17 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: userid, username, acctdesc, following, followers, totaltweets, usercreatedts
 Schema: userId, username, acctdesc, n_following, n_followers, n_totaltweets, usercreatedts
Expected: n_following but found: following
CSV file: file:///home/portugapt/Documents/Cloud%202122/dev/ETL/input/220312-subset.csv


### CSV file to User dataframe

In [41]:
csv_file.createOrReplaceTempView("DATA_FILE")

In [21]:
spark.sql("SELECT * FROM DUPLICATED_USERS")

userId
1068621782


In [15]:
spark.sql("SELECT userId, usercreatedts, username, acctdesc, n_following, n_followers, n_totaltweets FROM DATA_FILE").createOrReplaceTempView("NEW_USER_DATA")

In [None]:
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
spark.conf.set('temporaryGcsBucket', BUCKET)

#

words = spark.read.format('bigquery') \
  .option('table', 'bigquery-public-data:samples.shakespeare') \
  .load()
words.createOrReplaceTempView('words')


In [None]:
dfTweet = spark.read.option("header",True).csv("./input/data.csv")

In [None]:
dfTweet.printSchema()

In [None]:
dfTweet.select('userid', 'username').collect()[0]

# Write Read CSV's to control file

In [None]:
## Join FILES_TO_PROCESS and FILES_ALREADY_READ in PROCESSED_FILES
PROCESSED_FILES = []

In [None]:
# Update txt file after data is processed
# https://stackoverflow.com/questions/43682521/writing-data-to-google-cloud-storage-using-python
def write_txt_to_bucket(bucket_name, destination_file_name, files_list):
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(destination_file_name) 
    ## Use bucket.get_blob('path/to/existing-blob-name.txt') to write to existing blobs
    with blob.open(mode='w') as f:
        f.write('\n'.join(files_list))
            
write_txt_to_bucket('cloud-computing-2122-bjr', 'control/read_files.txt', PROCESSED_FILES)