In [1]:
!pip install nest_asyncio asyncio requests_toolbelt pandas fabric-sdk-py



Start HyperFabric Ledger using this command
```bash
$sh script.sh
```

#### Import Necessary Libraries

In [2]:
import pandas as pd
import os
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
from typing import List
import asyncio
from hfc.fabric import Client
import json
from io import StringIO
import nest_asyncio
nest_asyncio.apply()


#### Initialize the Hyperledger Fabric Client

In [3]:
client = Client(net_profile="./network.json")
org1_admin = client.get_user(org_name='org1.example.com', name='Admin')
client.new_channel('mychannel')
channel_name = 'mychannel'
chaincode_name = 'autodata_chaincode'

Init client with profile=./network.json
create org with name=org1.example.com
create org with name=org2.example.com
create ca with name=ca-org1
create ca with name=ca-org2
Import orderers = dict_keys(['orderer.example.com'])
Import peers = dict_keys(['peer0.org1.example.com', 'peer0.org2.example.com'])
New channel with name = mychannel


#### Global Vars and config

In [4]:
SPLIT_SIZE = 33  # Number of rows per split
JWT="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySW5mb3JtYXRpb24iOnsiaWQiOiI5ZGZhMjcwNi0zN2Q5LTRkNDEtODY1Ni04Njg3Zjg3NGRkZmMiLCJlbWFpbCI6ImRlbmlsYmhhdHQyMDA0QGdtYWlsLmNvbSIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJwaW5fcG9saWN5Ijp7InJlZ2lvbnMiOlt7ImlkIjoiRlJBMSIsImRlc2lyZWRSZXBsaWNhdGlvbkNvdW50IjoxfSx7ImlkIjoiTllDMSIsImRlc2lyZWRSZXBsaWNhdGlvbkNvdW50IjoxfV0sInZlcnNpb24iOjF9LCJtZmFfZW5hYmxlZCI6ZmFsc2UsInN0YXR1cyI6IkFDVElWRSJ9LCJhdXRoZW50aWNhdGlvblR5cGUiOiJzY29wZWRLZXkiLCJzY29wZWRLZXlLZXkiOiI0ODY5NmE4MDJmYWQ1ODAxOGI2NyIsInNjb3BlZEtleVNlY3JldCI6IjRkYzEwZGI2MjZkODhlYTk5NzE0NmM3OGViNjdhODYxNTExNTc5OGIzOGIwNzkyMTc1ODI4N2Q3Njg1NzliNTIiLCJpYXQiOjE3MTE5OTI4OTB9.-htnX6xI8slvuJvRwW7OlfrTbk4xFtiAwPF-vrL66RY"

In [5]:
loop = asyncio.get_event_loop()

def wait(foo):
    return loop.run_until_complete(foo)

In [6]:
import asyncio

def run_async(async_func):
    """
    Run an async function synchronously and return the result.
    """
    loop = asyncio.get_event_loop()
    if loop.is_running():
        return asyncio.ensure_future(async_func)
    else:
        return loop.run_until_complete(async_func)

#### Utility Functions

_CSV and DF Utilities_

In [7]:
def load_csv(file_path: str) -> pd.DataFrame:
    """
    Load a CSV file into a DataFrame.
    """
    return pd.read_csv(file_path)

def save_df(df: pd.DataFrame, file_path: str):
    df.to_csv(file_path, index=False)

def split_dataframe(df: pd.DataFrame, split_size: int) -> List[pd.DataFrame]:
    """
    Split a DataFrame into smaller DataFrames of specified size.
    """
    return [df.iloc[i:i + split_size] for i in range(0, df.shape[0], split_size)]

def dataframe_to_csv(df: pd.DataFrame, file_path: str):
    """
    Save a DataFrame to a CSV file.
    """
    df.to_csv(file_path, index=False)

def str_to_df(csv_content: str) -> pd.DataFrame:
    """
    Converts a CSV string to a pandas DataFrame.
    """
    csv_string_io = StringIO(csv_content)
    return pd.read_csv(csv_string_io)

def delete_file(file_path: str):
    """
    Delete file from the File System
    """
    os.remove(file_path)

_IPFS Utilities_

In [8]:
def upload_to_ipfs(file_path: str) -> str:
    """
    Upload a file to IPFS via Pinata and return the IPFS hash.
    """
    file_name = os.path.basename(file_path)
    
    multipart_data = MultipartEncoder(
        fields={
            'file': (file_name, open(file_path, 'rb'), 'application/octet-stream'), 
            'pinataMetadata': '{"name": "' + file_name + '"}',
            'pinataOptions': '{"cidVersion": 0}'
        }
    )

    headers = {
        'Content-Type': multipart_data.content_type,
        'Authorization': f'Bearer {JWT}'
    }

    response = requests.post('https://api.pinata.cloud/pinning/pinFileToIPFS', 
                             data=multipart_data, 
                             headers=headers)
    
    if response.status_code == 200:
        ipfs_hash = response.json()["IpfsHash"]
        print(f"File {file_name} successfully uploaded to IPFS with hash: {ipfs_hash}")
        return ipfs_hash
    else:
        print(f"Failed to upload file {file_name}: {response.text}")
        return ""

def fetch_from_ipfs(ipfs_hash: str) -> str:
    """
    Fetches content from an IPFS hash using the Pinata gateway (or any other IPFS gateway).
    """
    url = f"https://gateway.pinata.cloud/ipfs/{ipfs_hash}"
    response = requests.get(url)
    if response.status_code == 200:
        return response.content.decode('utf-8')  # Assumes the content is text-based (e.g., CSV)
    else:
        raise Exception(f"Failed to fetch file from IPFS with hash: {ipfs_hash}")

def retrieve_data_from_hashes(ipfs_hashes: List[str]) -> pd.DataFrame:
    """
    Retrieves data from a list of IPFS hashes and concatenates it into a single DataFrame.
    """
    dfs = []  # List to store individual data frames
    for _hash in ipfs_hashes:
        data = fetch_from_ipfs(_hash)
        _df = str_to_df(data)
        dfs.append(_df)  # Append the DataFrame to the list
    
    if dfs:  # Check if the list is not empty
        df = pd.concat(dfs, ignore_index=True)  # Concatenate all DataFrames in the list
    else:
        df = pd.DataFrame()  # Return an empty DataFrame if no data was fetched
    return df

_Hyperledger Chaincode/Smart Contract Utilities_

In [9]:
async def store_hash_on_contract(user_address: str, ipfs_hash: str):
    """
    Store a single IPFS hash on the blockchain via a smart contract function.
    """
    args = [user_address, ipfs_hash]
    response = await client.chaincode_invoke(
        requestor=org1_admin,
        channel_name=channel_name,
        peers=['peer0.org1.example.com', 'peer0.org2.example.com'],
        cc_name=chaincode_name,
        fcn='AddIPFSHash',
        args=args,
        cc_pattern=None
    )
    print(f"Added IPFS hash {ipfs_hash} on-chain")
    return response

async def store_hashes_on_contract(user_address: str, ipfs_hashes: List[str]):
    """
    Stores the list of IPFS hashes on a blockchain via a smart contract
    """

    ipfs_hashes_str = json.dumps(ipfs_hashes)
    args = [user_address, ipfs_hashes_str]
    response = await client.chaincode_invoke(
        requestor=org1_admin,
        channel_name=channel_name,
        peers=['peer0.org1.example.com', 'peer0.org2.example.com'],
        cc_name=chaincode_name,
        fcn='AddIPFSHashes',
        args=args,
        cc_pattern=None
    )
    print(f"Added IPFS hash {ipfs_hashes} on-chain")
    return response
    

async def retrieve_hashes_from_contract(user_address: str) -> List[str]:
    """
    Retrieve the list of IPFS hashes stored in a smart contract for a user.
    """
    # Implementation depends on the blockchain and contract
    args = [user_address]
    response = await client.chaincode_query(
        requestor=org1_admin,
        peers=['peer0.org1.example.com'],
        channel_name=channel_name,
        cc_name=chaincode_name,
        fcn='GetIPFSHashes',
        args=args
    )
    return json.loads(response)['ipfsHashes']

In [10]:
async def upload_user_data(file_path: str, user_address: str):
    # Load CSV
    df = load_csv(file_path)
    
    # Split DataFrame
    dfs = split_dataframe(df, SPLIT_SIZE)
    
    ipfs_hashes = []
    for i, split_df in enumerate(dfs, start=1):
        # Save split DataFrame to CSV
        split_file_path = f"{user_address}_split_{i}.csv"
        dataframe_to_csv(split_df, split_file_path)
        
        # Upload to IPFS
        ipfs_hash = upload_to_ipfs(split_file_path)
        ipfs_hashes.append(ipfs_hash)
        print(f"Uploaded split {i} to IPFS with hash: {ipfs_hash}")
        delete_file(split_file_path)

    # Store IPFS hashes on blockchain
    await store_hashes_on_contract(user_address, ipfs_hashes)
    
    print(f"Stored IPFS hashes on blockchain for user: {user_address}")

In [11]:
async def get_user_data(user_address: str):
    ipfs_hashes = await retrieve_hashes_from_contract(user_address)
    print(ipfs_hashes)
    combined_df = retrieve_data_from_hashes(ipfs_hashes)
    filename = f"{user_address}_combined.csv"
    save_df(combined_df, filename)
    print(f"Data saved to {filename}.")

#### Interactive Part:

In [12]:
def display_menu():
    print("<","="*5,"MENU","="*5,">")
    print("Select an option:")
    print("1. Upload user data")
    print("2. Get user data")
    print("3. Exit")
    choice = input("Enter your choice (1/2/3): ")
    return choice

In [14]:
async def main():
    while True:
        choice = display_menu()

        if choice == "1":
            print("<","="*5,"UPLOAD USER DATA","="*5,">")
            file_path = input("Enter the file path: ")
            user_address = input("Enter the user address: ")
            await upload_user_data(file_path, user_address)

        elif choice == "2":
            print("<","="*5,"GET USER DATA","="*5,">")
            user_address = input("Enter the user address: ")
            await get_user_data(user_address)

        elif choice == "3":
            print("Exiting...")
            break

        else:
            print("Invalid choice. Try again.")
        input("Enter any key to continue...")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

< ===== MENU ===== >
Select an option:
1. Upload user data
2. Get user data
3. Exit
< ===== UPLOAD USER DATA ===== >
File 0x123_split_1.csv successfully uploaded to IPFS with hash: QmNuUgVgcURXFWM6HRyAm9gC3ZQQ1MXSsEkCZRUcasdmVf
Uploaded split 1 to IPFS with hash: QmNuUgVgcURXFWM6HRyAm9gC3ZQQ1MXSsEkCZRUcasdmVf
File 0x123_split_2.csv successfully uploaded to IPFS with hash: QmfUkMrmkJdvw5KNVHYRusejFu9F9rLdoEoR3kWLcGwPgm
Uploaded split 2 to IPFS with hash: QmfUkMrmkJdvw5KNVHYRusejFu9F9rLdoEoR3kWLcGwPgm
File 0x123_split_3.csv successfully uploaded to IPFS with hash: QmUkcG4JdRhUhKajfBTjUwa2W8yKtwBdCyA9Bt3k37G9yd
Uploaded split 3 to IPFS with hash: QmUkcG4JdRhUhKajfBTjUwa2W8yKtwBdCyA9Bt3k37G9yd
Added IPFS hash ['QmNuUgVgcURXFWM6HRyAm9gC3ZQQ1MXSsEkCZRUcasdmVf', 'QmfUkMrmkJdvw5KNVHYRusejFu9F9rLdoEoR3kWLcGwPgm', 'QmUkcG4JdRhUhKajfBTjUwa2W8yKtwBdCyA9Bt3k37G9yd'] on-chain
Stored IPFS hashes on blockchain for user: 0x123
< ===== MENU ===== >
Select an option:
1. Upload user data
2. Get user data
