In [None]:
from collections import Counter

In [None]:
print("This is a tutorial!")

This is a tutorial!


<a id='text-cells'></a>
### Text cells

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Hello world!

<a id='access-to-the-shell'></a>
### Access to the shell

In [None]:
ls

[0m[01;34mdrive[0m/        [01;34mspark-3.1.1-bin-hadoop3.2[0m/     spark-3.1.1-bin-hadoop3.2.tgz.1
[01;34msample_data[0m/  spark-3.1.1-bin-hadoop3.2.tgz  spark-3.1.1-bin-hadoop3.2.tgz.2


In [None]:
pwd

'/content'

<a id='installing-spark'></a>
### Installing Spark

Install Dependencies:


1.   Java 8
2.   Apache Spark with hadoop and
3.   Findspark (used to locate the spark in the system)


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

Set Environment Variables:

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
!ls

drive			       spark-3.1.1-bin-hadoop3.2.tgz.1
sample_data		       spark-3.1.1-bin-hadoop3.2.tgz.2
spark-3.1.1-bin-hadoop3.2      spark-3.1.1-bin-hadoop3.2.tgz.3
spark-3.1.1-bin-hadoop3.2.tgz


In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

<a id='exploring-the-dataset'></a>
## Exploring the Dataset

<a id='loading-the-dataset'></a>
### Loading the Dataset

In [None]:
!ls

drive			       spark-3.1.1-bin-hadoop3.2.tgz.1
sample_data		       spark-3.1.1-bin-hadoop3.2.tgz.2
spark-3.1.1-bin-hadoop3.2      spark-3.1.1-bin-hadoop3.2.tgz.3
spark-3.1.1-bin-hadoop3.2.tgz


In [None]:
# Load data from csv to a dataframe. 
# header=True means the first row is a header 
# sep=';' means the column are seperated using ''
df = spark.read.csv('/content/drive/MyDrive/BLOCKCHAIN/combined-csv-files.csv',header=True,inferSchema=True)
df.show()

+--------------------+-------------------+
|             address|        eth_balance|
+--------------------+-------------------+
|0x9874f287b5a05c9...|                  0|
|0x8215582518610d7...|  24175927118817482|
|0xf4be7f6f2529450...|                  0|
|0xac53fd148679e0b...|        10000000004|
|0xd809a6957f057f9...|                  0|
|0x76dcc99f6935785...|                  0|
|0xb82faa264ea92ca...|     15538379333120|
|0x1d0ccd03422ceef...|                  0|
|0x5961c547503ee0a...|                  0|
|0x4a65d343b5b7f55...|                  0|
|0xda885bae09fa3db...|    254673894510000|
|0x7ac34ef35edfa28...|1404744140280226447|
|0xad649bcaff3495e...|                  0|
|0xf19630799ec4783...|                  0|
|0x8ebb9dbd42671df...|      2015730000000|
|0xa9fd6ed5172b4c2...|    474000000000000|
|0x663c984a6898679...|                  0|
|0xc5f62d1bbde43dd...|                  0|
|0x1dafe114cfb27af...|                  0|
|0xcd6758e0bb9555b...|                  0|
+----------

The above command loads our data from into a dataframe (DF). A dataframe is a 2-dimensional labeled data structure with columns of potentially different types.

<a id='viewing-the-dataframe'></a>
### Viewing the Dataframe

There are a couple of ways to view your dataframe(DF) in PySpark:

1.   `df.take(5)` will return a list of five Row objects. 
2.   `df.collect()` will get all of the data from the entire DataFrame. Be really careful when using it, because if you have a large data set, you can easily crash the driver node. 
3.   `df.show()` is the most commonly used method to view a dataframe. There are a few parameters we can pass to this method, like the number of rows and truncaiton. For example, `df.show(5, False)` or ` df.show(5, truncate=False)` will show the entire data wihtout any truncation.
4.   `df.limit(5)` will **return a new DataFrame** by taking the first n rows. As spark is distributed in nature, there is no guarantee that `df.limit()` will give you the same results each time.

Let us see some of them in action below:

In [None]:
df.show(5, truncate=False)

+------------------------------------------+-----------------+
|address                                   |eth_balance      |
+------------------------------------------+-----------------+
|0x9874f287b5a05c945cbedffb0bfd9fbcde6c01f5|0                |
|0x8215582518610d7adf8c9939ea03ca3baa6839f6|24175927118817482|
|0xf4be7f6f252945036427ed53ee869b9d09b76e3c|0                |
|0xac53fd148679e0b9ee448d5928ac7aa1cecbe10d|10000000004      |
|0xd809a6957f057f91954245b60e2056beb5faf037|0                |
+------------------------------------------+-----------------+
only showing top 5 rows



In [None]:
df.limit(5)

address,eth_balance
0x9874f287b5a05c9...,0
0x8215582518610d7...,24175927118817482
0xf4be7f6f2529450...,0
0xac53fd148679e0b...,10000000004
0xd809a6957f057f9...,0


In [None]:

df.na.drop().show()

+--------------------+-------------------+
|             address|        eth_balance|
+--------------------+-------------------+
|0x9874f287b5a05c9...|                  0|
|0x8215582518610d7...|  24175927118817482|
|0xf4be7f6f2529450...|                  0|
|0xac53fd148679e0b...|        10000000004|
|0xd809a6957f057f9...|                  0|
|0x76dcc99f6935785...|                  0|
|0xb82faa264ea92ca...|     15538379333120|
|0x1d0ccd03422ceef...|                  0|
|0x5961c547503ee0a...|                  0|
|0x4a65d343b5b7f55...|                  0|
|0xda885bae09fa3db...|    254673894510000|
|0x7ac34ef35edfa28...|1404744140280226447|
|0xad649bcaff3495e...|                  0|
|0xf19630799ec4783...|                  0|
|0x8ebb9dbd42671df...|      2015730000000|
|0xa9fd6ed5172b4c2...|    474000000000000|
|0x663c984a6898679...|                  0|
|0xc5f62d1bbde43dd...|                  0|
|0x1dafe114cfb27af...|                  0|
|0xcd6758e0bb9555b...|                  0|
+----------

<a id='viewing-dataframe-columns'></a>
### Viewing Dataframe Columns

In [None]:
df.columns

['address', 'eth_balance']

<a id='dataframe-schema'></a>
### Dataframe Schema

There are two methods commonly used to view the data types of a dataframe:

In [None]:
df.dtypes

[('address', 'string'), ('eth_balance', 'decimal(26,0)')]

In [None]:
df.printSchema()

root
 |-- address: string (nullable = true)
 |-- eth_balance: decimal(26,0) (nullable = true)



In [None]:
df.count()

218028252

<a id='explicit-schema-inference'></a>
#### Defining Schema Explicitly

In [None]:
from pyspark.sql.types import *
df.columns

['address', 'eth_balance']

As we can see here, the data has been successully loaded with the specified datatypes.

<a id='selecting-columns'></a>
### Selecting Columns

There are multiple ways to do a select in PySpark. You can find how they differ and how each below:

In [None]:
# 1st method
# Column name is case sensitive in this usage
print(df.address)
print("*"*20)
df.select(df.address).show(truncate=False)

Column<'address'>
********************
+------------------------------------------+
|address                                   |
+------------------------------------------+
|0x9874f287b5a05c945cbedffb0bfd9fbcde6c01f5|
|0x8215582518610d7adf8c9939ea03ca3baa6839f6|
|0xf4be7f6f252945036427ed53ee869b9d09b76e3c|
|0xac53fd148679e0b9ee448d5928ac7aa1cecbe10d|
|0xd809a6957f057f91954245b60e2056beb5faf037|
|0x76dcc99f6935785dfb494773d98a09613147d4be|
|0xb82faa264ea92ca78a4de6d76089457d0f07f2a2|
|0x1d0ccd03422ceefddafa55a0acff36c3789827bf|
|0x5961c547503ee0a1e185fc29dbc87fb5f0c98071|
|0x4a65d343b5b7f55d668c9572a672c0a6685e5776|
|0xda885bae09fa3dbfb1faeb84867bbb3fb68bfbb6|
|0x7ac34ef35edfa28cb4961e7a383c3be8f599b1a3|
|0xad649bcaff3495e70cfc73fbce2dd19a2760073b|
|0xf19630799ec47834aee3995e61e41d699e6579b0|
|0x8ebb9dbd42671dfb5b054c8d3cd717058d658658|
|0xa9fd6ed5172b4c266948ce2ae41336a9b58155fb|
|0x663c984a689867998f19f11684051abb07d287af|
|0xc5f62d1bbde43dd086a25a8fd99a1675160cbc04|
|0x1dafe114cfb27

**NOTE:**

> **We can't always use the dot notation because this will break when the column names have reserved names or attributes to the data frame class. Additionally, the column names are case sensitive in nature so we need to always make sure the column names have been changed to a paticular case before using it.**



In [None]:
# 2nd method
# Column name is case insensitive here
print(df['address'])
print("*"*20)
df.select(df['address']).show(truncate=False)

Column<'address'>
********************
+------------------------------------------+
|address                                   |
+------------------------------------------+
|0x9874f287b5a05c945cbedffb0bfd9fbcde6c01f5|
|0x8215582518610d7adf8c9939ea03ca3baa6839f6|
|0xf4be7f6f252945036427ed53ee869b9d09b76e3c|
|0xac53fd148679e0b9ee448d5928ac7aa1cecbe10d|
|0xd809a6957f057f91954245b60e2056beb5faf037|
|0x76dcc99f6935785dfb494773d98a09613147d4be|
|0xb82faa264ea92ca78a4de6d76089457d0f07f2a2|
|0x1d0ccd03422ceefddafa55a0acff36c3789827bf|
|0x5961c547503ee0a1e185fc29dbc87fb5f0c98071|
|0x4a65d343b5b7f55d668c9572a672c0a6685e5776|
|0xda885bae09fa3dbfb1faeb84867bbb3fb68bfbb6|
|0x7ac34ef35edfa28cb4961e7a383c3be8f599b1a3|
|0xad649bcaff3495e70cfc73fbce2dd19a2760073b|
|0xf19630799ec47834aee3995e61e41d699e6579b0|
|0x8ebb9dbd42671dfb5b054c8d3cd717058d658658|
|0xa9fd6ed5172b4c266948ce2ae41336a9b58155fb|
|0x663c984a689867998f19f11684051abb07d287af|
|0xc5f62d1bbde43dd086a25a8fd99a1675160cbc04|
|0x1dafe114cfb27

In [None]:
# 3rd method
# Column name is case insensitive here
from pyspark.sql.functions import col
df.select(col('address')).show(truncate=False)

+------------------------------------------+
|address                                   |
+------------------------------------------+
|0x9874f287b5a05c945cbedffb0bfd9fbcde6c01f5|
|0x8215582518610d7adf8c9939ea03ca3baa6839f6|
|0xf4be7f6f252945036427ed53ee869b9d09b76e3c|
|0xac53fd148679e0b9ee448d5928ac7aa1cecbe10d|
|0xd809a6957f057f91954245b60e2056beb5faf037|
|0x76dcc99f6935785dfb494773d98a09613147d4be|
|0xb82faa264ea92ca78a4de6d76089457d0f07f2a2|
|0x1d0ccd03422ceefddafa55a0acff36c3789827bf|
|0x5961c547503ee0a1e185fc29dbc87fb5f0c98071|
|0x4a65d343b5b7f55d668c9572a672c0a6685e5776|
|0xda885bae09fa3dbfb1faeb84867bbb3fb68bfbb6|
|0x7ac34ef35edfa28cb4961e7a383c3be8f599b1a3|
|0xad649bcaff3495e70cfc73fbce2dd19a2760073b|
|0xf19630799ec47834aee3995e61e41d699e6579b0|
|0x8ebb9dbd42671dfb5b054c8d3cd717058d658658|
|0xa9fd6ed5172b4c266948ce2ae41336a9b58155fb|
|0x663c984a689867998f19f11684051abb07d287af|
|0xc5f62d1bbde43dd086a25a8fd99a1675160cbc04|
|0x1dafe114cfb27afe08becf79ce641ab1ef60d5e0|
|0xcd6758e

<a id='selecting-multiple-columns'></a>
### Selecting Multiple Columns

In [None]:
# 1st method
# Column name is case sensitive in this usage
print(df.address, df.address)
print("*"*40)
df.select(df.address, df.eth_balance).show(truncate=False)

Column<'address'> Column<'address'>
****************************************
+------------------------------------------+-------------------+
|address                                   |eth_balance        |
+------------------------------------------+-------------------+
|0x9874f287b5a05c945cbedffb0bfd9fbcde6c01f5|0                  |
|0x8215582518610d7adf8c9939ea03ca3baa6839f6|24175927118817482  |
|0xf4be7f6f252945036427ed53ee869b9d09b76e3c|0                  |
|0xac53fd148679e0b9ee448d5928ac7aa1cecbe10d|10000000004        |
|0xd809a6957f057f91954245b60e2056beb5faf037|0                  |
|0x76dcc99f6935785dfb494773d98a09613147d4be|0                  |
|0xb82faa264ea92ca78a4de6d76089457d0f07f2a2|15538379333120     |
|0x1d0ccd03422ceefddafa55a0acff36c3789827bf|0                  |
|0x5961c547503ee0a1e185fc29dbc87fb5f0c98071|0                  |
|0x4a65d343b5b7f55d668c9572a672c0a6685e5776|0                  |
|0xda885bae09fa3dbfb1faeb84867bbb3fb68bfbb6|254673894510000    |
|0x7ac34ef35e

In [None]:
df = df.withColumn("no_of_ethereum",col("eth_balance")* 1)


In [None]:
import pandas as pd
import numpy as np
import time
import datetime;

In [None]:
#Logging time of start of execution
ct = datetime.datetime.now()
print("current time:-", ct)

current time:- 2022-07-22 10:32:41.841336


In [None]:
df = df.withColumn("ethereum_price",col("no_of_ethereum")* 1582)           # ethereium price in dollar 1 eth=1526 $


In [None]:
df.show()

+--------------------+-------------------+-------------------+--------------------+
|             address|        eth_balance|     no_of_ethereum|      ethereum_price|
+--------------------+-------------------+-------------------+--------------------+
|0x9874f287b5a05c9...|                  0|                  0|                   0|
|0x8215582518610d7...|  24175927118817482|  24175927118817482|38246316701969256524|
|0xf4be7f6f2529450...|                  0|                  0|                   0|
|0xac53fd148679e0b...|        10000000004|        10000000004|      15820000006328|
|0xd809a6957f057f9...|                  0|                  0|                   0|
|0x76dcc99f6935785...|                  0|                  0|                   0|
|0xb82faa264ea92ca...|     15538379333120|     15538379333120|   24581716104995840|
|0x1d0ccd03422ceef...|                  0|                  0|                   0|
|0x5961c547503ee0a...|                  0|                  0|              

In [None]:
from pyspark.sql.functions import when
df3 = df.withColumn('ethereum_range',
         when((df.no_of_ethereum>=0) & (df.no_of_ethereum<10),"0-10")
        .when((df.no_of_ethereum>=10) & (df.no_of_ethereum<50),"10-50")
        .when((df.no_of_ethereum>=50) & (df.no_of_ethereum<100),"50-100")
        .when((df.no_of_ethereum>=100) & (df.no_of_ethereum<500),"100-500")
        .when((df.no_of_ethereum>=500) & (df.no_of_ethereum<1000),"500-1k")
        .when((df.no_of_ethereum>=1000) & (df.no_of_ethereum<5000),"1k-5k")
        .when((df.no_of_ethereum>=5000) & (df.no_of_ethereum<10000),"5k-10k")
        .otherwise('10k+'))

In [None]:
df3.show()

+--------------------+-------------------+-------------------+--------------------+--------------+
|             address|        eth_balance|     no_of_ethereum|      ethereum_price|ethereum_range|
+--------------------+-------------------+-------------------+--------------------+--------------+
|0x9874f287b5a05c9...|                  0|                  0|                   0|          0-10|
|0x8215582518610d7...|  24175927118817482|  24175927118817482|38246316701969256524|          10k+|
|0xf4be7f6f2529450...|                  0|                  0|                   0|          0-10|
|0xac53fd148679e0b...|        10000000004|        10000000004|      15820000006328|          10k+|
|0xd809a6957f057f9...|                  0|                  0|                   0|          0-10|
|0x76dcc99f6935785...|                  0|                  0|                   0|          0-10|
|0xb82faa264ea92ca...|     15538379333120|     15538379333120|   24581716104995840|          10k+|
|0x1d0ccd0

In [None]:
from pyspark.sql.functions import desc
df3.groupBy("ethereum_range").count().orderBy(desc("count"))  # or  df3.groupBy('ethereum_range').count().show()

ethereum_range,count
0-10,136337831
10k+,80863074
5k-10k,355258
1k-5k,336446
10-50,58338
100-500,37650
500-1k,23879
50-100,15776


In [None]:
dataF=df3.select("ethereum_range").count()

In [None]:
#creating groups for dollar value of bitcoins owned in a wallet
from pyspark.sql.functions import when
df3 = df3.withColumn('ethereum_dollar_range',
            when((df.ethereum_price>=0) & (df.ethereum_price<10000),'0-$10K')
            .when((df.ethereum_price>=10000) & (df.ethereum_price<100000),'$10K-$100K')
            .when((df.ethereum_price>=100000) & (df.ethereum_price<500000),'100K-500K')
            .when((df.ethereum_price>=500000) & (df.ethereum_price<1000000),'500K-1M') 
            .when((df.ethereum_price>=1000000) & (df.ethereum_price<5000000),'1M-5M')
            .when((df.ethereum_price >=5000000) & (df.ethereum_price<10000000),'5M-10M')
            .when((df.ethereum_price>=10000000) & (df.ethereum_price<25000000),'10M-25M')
            .when((df.ethereum_price>=25000000) & (df.ethereum_price<50000000),'25M-50M')
            .when((df.ethereum_price>=50000000) & (df.ethereum_price<100000000),'50M-100M')
            .when((df.ethereum_price>=100000000) & (df.ethereum_price<250000000),'100M-250M')
            .when((df.ethereum_price>=250000000) & (df.ethereum_price<1000000000),'250M-1B')
            .when((df.ethereum_price>=1000000000) & (df.ethereum_price<10000000000),'1B-10B')
            .otherwise('>10B+'))

In [None]:
df3.show()

+--------------------+-------------------+-------------------+--------------------+--------------+---------------------+
|             address|        eth_balance|     no_of_ethereum|      ethereum_price|ethereum_range|ethereum_dollar_range|
+--------------------+-------------------+-------------------+--------------------+--------------+---------------------+
|0x9874f287b5a05c9...|                  0|                  0|                   0|          0-10|               0-$10K|
|0x8215582518610d7...|  24175927118817482|  24175927118817482|38246316701969256524|          10k+|                >10B+|
|0xf4be7f6f2529450...|                  0|                  0|                   0|          0-10|               0-$10K|
|0xac53fd148679e0b...|        10000000004|        10000000004|      15820000006328|          10k+|                >10B+|
|0xd809a6957f057f9...|                  0|                  0|                   0|          0-10|               0-$10K|
|0x76dcc99f6935785...|          

In [None]:
from pyspark.sql.functions import desc
df3.groupBy("ethereum_dollar_range").count().orderBy(desc("count"))  # or  df3.groupBy('ethereum_dollar_range').count().show()

ethereum_dollar_range,count
0-$10K,136326454
>10B+,80600287
5M-10M,515709
10M-25M,217168
1M-5M,79656
$10K-$100K,73782
1B-10B,73116
25M-50M,49213
100K-500K,45995
500K-1M,19558


In [None]:
dataF=df3.select("ethereum_dollar_range","ethereum_price","no_of_ethereum")

In [None]:
dataF.show()

+---------------------+--------------------+-------------------+
|ethereum_dollar_range|      ethereum_price|     no_of_ethereum|
+---------------------+--------------------+-------------------+
|               0-$10K|                   0|                  0|
|                >10B+|38246316701969256524|  24175927118817482|
|               0-$10K|                   0|                  0|
|                >10B+|      15820000006328|        10000000004|
|               0-$10K|                   0|                  0|
|               0-$10K|                   0|                  0|
|                >10B+|   24581716104995840|     15538379333120|
|               0-$10K|                   0|                  0|
|               0-$10K|                   0|                  0|
|               0-$10K|                   0|                  0|
|                >10B+|  402894101114820000|    254673894510000|
|                >10B+|22223052299233182...|1404744140280226447|
|               0-$10K|  

In [None]:
dataF.write.options(header=True).csv("/content/drive/MyDrive/eth_dollar_price_range_sum.csv")

In [None]:
# df5= df2.groupBy('ethereum_dollar_range').count().show()

In [None]:
dataF2=df3.select("ethereum_range","ethereum_price","no_of_ethereum")

https://www.youtube.com/watch?v=I0V8H6fi2G4

In [None]:
dataF2.show()

+--------------+--------------------+-------------------+
|ethereum_range|      ethereum_price|     no_of_ethereum|
+--------------+--------------------+-------------------+
|          0-10|                   0|                  0|
|          10k+|38246316701969256524|  24175927118817482|
|          0-10|                   0|                  0|
|          10k+|      15820000006328|        10000000004|
|          0-10|                   0|                  0|
|          0-10|                   0|                  0|
|          10k+|   24581716104995840|     15538379333120|
|          0-10|                   0|                  0|
|          0-10|                   0|                  0|
|          0-10|                   0|                  0|
|          10k+|  402894101114820000|    254673894510000|
|          10k+|22223052299233182...|1404744140280226447|
|          0-10|                   0|                  0|
|          0-10|                   0|                  0|
|          10k

In [None]:
dataF2.write.options(header=True).csv("/content/drive/MyDrive/ethereum_result/ethereum_price_range_sum.csv")

REference: https://dbmstutorials.com/pyspark/spark-dataframe-to-file-part-1.html
