In [2]:
%%writefile Dockerfile
# Use the official Apache Beam SDK image as the base
FROM apache/beam_python3.10_sdk:2.53.0

RUN apt-get update && apt-get install -y

RUN pip install \
    google-cloud-pubsub \
    paramiko \
    google-cloud-secret-manager

# Download and install rclone
RUN curl -O https://downloads.rclone.org/rclone-current-linux-amd64.zip \
    && unzip rclone-current-linux-amd64.zip \
    && cd rclone-*-linux-amd64 \
    && cp rclone /usr/bin/ \
    && chown root:root /usr/bin/rclone \
    && chmod 755 /usr/bin/rclone \
    && mkdir -p /usr/local/share/man/man1 \
    && cp rclone.1 /usr/local/share/man/man1/
    # && mandb

# Fix protoco and gcp-storage problem
ENV PB_REL="https://github.com/protocolbuffers/protobuf/releases"
RUN curl -LO $PB_REL/download/v30.2/protoc-30.2-linux-x86_64.zip \
    && unzip protoc-30.2-linux-x86_64.zip -d /root/.local \
    && export PATH="$PATH:/root/.local/bin" \
    && pip install protobuf \
    && pip install --upgrade google-cloud-storage \
    && pip install --upgrade grpcio

# Set the working directory (optional, but good practice)
WORKDIR /app

# Copy your Beam pipeline code into the container
# COPY ./pipeline.py /app/pipeline.py


# You might want to set a default command to run your pipeline
ENTRYPOINT ["/opt/apache/beam/boot"]

Overwriting Dockerfile


In [2]:
%%writefile pipeline.py
import os
import subprocess
import csv
import tempfile

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions



class RcloneMixin:
    def _prepare_rclone(self):
        import tempfile
      # PROJECT, SECRET, REMOTE, BUCKET configuration
        PROJECT       = 'omar-ismail-test-project'
        SECRET_NAME   = 'sftp_default_key'
        RCLONE_REMOTE = 'myvm' # you must setup the rclone for it to work
        REMOTE_DIR    = 'customer_test_data'

        HOST          = '34.69.92.10'
        USER          = 'omar_ismail'
        PORT          = 22
        GCS_BUCKET    = 'new_remove'
        from google.cloud import secretmanager
        # 1) Fetch key from Secret Manager
        client = secretmanager.SecretManagerServiceClient()
        name   = f"projects/{PROJECT}/secrets/{SECRET_NAME}/versions/latest"
        key_str = client.access_secret_version(request={"name": name}) \
                        .payload.data.decode('utf-8')

        # 2) Write key to secure temp file
        self.key_path = os.path.join(tempfile.gettempdir(), 'sftp_key')
        print(f"key location: {self.key_path}")
        with open(self.key_path, 'w') as f:
            f.write(key_str)
        os.chmod(self.key_path, 0o600)

        # 3) Write a minimal rclone.conf
        self.conf_path = os.path.join(tempfile.gettempdir(), 'rclone.conf')
        conf = f"""
[{RCLONE_REMOTE}]
type = sftp
host = {HOST}
user = {USER}
port = {PORT}
key_file = {self.key_path}
"""
        with open(self.conf_path, 'w') as f:
            f.write(conf.strip() + "\n")
        print("rclone configuration directory", self.conf_path)

class ListFiles(RcloneMixin, beam.DoFn):
    def setup(self):
        self._prepare_rclone()
    def process(self, _):
        import subprocess
        RCLONE_REMOTE = 'myvm' # you must setup the rclone for it to work
        REMOTE_DIR    = 'customer_test_data'
        print("fetch list of files")
        out = subprocess.check_output([
            'rclone', '--config', self.conf_path,
            'lsf', f"{RCLONE_REMOTE}:{REMOTE_DIR}"
        ]).decode('utf-8').splitlines()
        print("finish fetching", out)

        for fname in out:
            if fname.endswith('.csv'):
                print(f"Found csv: {fname}")
                yield fname
            if fname.endswith('.parquet'):
                print(f"Found parquet: {fname}")
                yield fname
            


class FetchWithRclone(RcloneMixin, beam.DoFn):
    def setup(self):
        import tempfile, socket, logging

        self._prepare_rclone()
        self.tmpdir = tempfile.mkdtemp()
        
        # per-worker logger → writes to <tmpdir>/<hostname>.log
        self.worker_id = socket.gethostname()
        self.logfile = os.path.join(self.tmpdir, f"{self.worker_id}.log")
        handler = logging.FileHandler(self.logfile)
        handler.setFormatter(logging.Formatter("%(message)s"))
        self.logger = logging.getLogger(self.worker_id)
        self.logger.setLevel(logging.INFO)
        self.logger.addHandler(handler)

    def process(self, filename):
        import subprocess
        RCLONE_REMOTE = 'myvm' # you must setup the rclone for it to work
        REMOTE_DIR    = 'customer_test_data'
        local_path = os.path.join(self.tmpdir, filename)
        print(f"the new tmp file in {local_path}")
        print(f"copying from {RCLONE_REMOTE}:{REMOTE_DIR}/{filename}")
        subprocess.run([
            'rclone', '--config', self.conf_path,
            'copyto',
            f"{RCLONE_REMOTE}:{REMOTE_DIR}/{filename}",
            local_path
        ], check=True)
        self.logger.info(filename)
        print(f"finished copying {local_path}")
        yield local_path
    def teardown(self):
            # at worker shutdown you can inspect self.logfile
            # or (if bucket_name given) upload it to GCS:
        from google.cloud import storage
        client = storage.Client()
        bucket = client.bucket("remove_later123")
        blob = bucket.blob(f"dataflow_testing/{os.path.basename(self.logfile)}")
        blob.upload_from_filename(self.logfile)


# class CopyToGCS(beam.DoFn):
#     def __init__(self, target_uri):
#         self.target_uri = target_uri
#     def process(self, local_path):

#         from apache_beam.io.filesystems import FileSystems
#         print(f"moving file {local_path} to {self.target_uri}{local_path.split('/')[-1]}")
#         FileSystems.copy([local_path], [self.target_uri+local_path.split('/')[-1]])
#         yield local_path


class UploadToGCS(beam.DoFn):
    def __init__(self, bucket_name, prefix=""):
        self.bucket_name = bucket_name
        self.prefix = prefix.strip("/")

    def setup(self):
        from google.cloud import storage
        self.client = storage.Client()
        self.bucket = self.client.bucket(self.bucket_name)

    def process(self, local_path):
        import os
        filename = os.path.basename(local_path)
        if self.prefix:
            blob_path = f"{self.prefix}/{filename}"
        else:
            blob_path = filename

        blob = self.bucket.blob(blob_path)
        blob.upload_from_filename(local_path)

        os.remove(local_path)
        gcs_uri = f"gs://{self.bucket_name}/{blob_path}"
        print(f"moved file {local_path} to {gcs_uri}")
        yield gcs_uri



# def parse_csv(local_path):
#     import csv
#     with open(local_path, newline='') as f:
#         for row in csv.DictReader(f):
#             yield row


def run():
    project_id = 'omar-ismail-test-project'
    options = PipelineOptions(
        # streaming=True,
        project=project_id,
        region='us-central1',
        temp_location='gs://remove_later123/temp',
        # service_account_email='dataflow-worker@omar-ismail-test-project.iam.gserviceaccount.com'
    )

    with beam.Pipeline(options=options) as p:
        files = (
            p
            | 'Init'      >> beam.Create([None])
            | 'ListFiles' >> beam.ParDo(ListFiles())
            | 'Reshuffle' >> beam.Reshuffle()

        )

        fetched = files | 'FetchOne' >> beam.ParDo(FetchWithRclone())

        _ = fetched | 'SaveRaw' >> beam.ParDo(
            UploadToGCS(bucket_name="new_remove", prefix="raw"))
            # note: you can parameterize per-filename if needed

        # _ = fetched | 'ParseCSV' >> beam.FlatMap(parse_csv) 


if __name__ == '__main__':
    run()


Overwriting pipeline.py


In [None]:
# !pip install --upgrade apache-beam[gcp]==2.53.0


Python 3.10.16


In [29]:
# !python pipeline.py \
#   --runner=DataflowRunner \
#   --project=omar-ismail-test-project \
#   --region=us-central1 \
#   --temp_location=gs://remove_later123/dataflow_streaming_test/tmp/ \
#   --staging_location=gs://remove_later123/dataflow_streaming_test/stage/ \
#   --num_workers=1 \
#   --max_num_workers=2 \
#   --autoscaling_algorithm=THROUGHPUT_BASED \
#   --worker_machine_type=e2-small \
#   --experiments=use_runner_v2 \
#   --flink_version=1.15 \
#   --sdk_container_image='gcr.io/omar-ismail-test-project/beam-rclone-image:latest'


In [None]:
# # Rclone binaries
# curl -O https://downloads.rclone.org/rclone-current-linux-amd64.zip
# unzip rclone-current-linux-amd64.zip
# cd rclone-*-linux-amd64

# # move the files
# cp rclone /usr/bin/
# chown root:root /usr/bin/rclone
# chmod 755 /usr/bin/rclone
# # Install manpage
# mkdir -p /usr/local/share/man/man1
# cp rclone.1 /usr/local/share/man/man1/
# # mandb ### this doesn't work for some reason, but i can access rclone now

# #protoco install newer version, but didn't work correctly
# PB_REL="https://github.com/protocolbuffers/protobuf/releases"
# curl -LO $PB_REL/download/v30.2/protoc-30.2-linux-x86_64.zip
# unzip protoc-30.2-linux-x86_64.zip -d $HOME/.local
# export PATH="$PATH:$HOME/.local/bin"

# # so i used 
# pip install protobuf==3.20.*

# # for storage problem i used
# pip install --upgrade google-cloud-storage

# # for error "import name 'aio' from 'grpc'"
# pip install --upgrade grpcio


In [1]:
import tempfile
tempfile.gettempdir()

'/var/tmp'